From 2eec3957202c4a804798114323e19be42f8a7f35 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 8 May 2023 11:14:59 +0800 Subject: [PATCH] cache/commit: flush cache when tsdb commits data --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 5 +++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 22 ++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeCommit.c | 7 +++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c704472dff..95bce32196 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -346,6 +346,7 @@ struct STsdbFS { typedef struct { rocksdb_t *db; rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 69eacfa46e..94e5f253bf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -178,6 +178,7 @@ int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); +int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); @@ -194,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); + int32_t type); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d5c3b2a789..17aed62241 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -98,12 +98,19 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { goto _err3; } + rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); + if (NULL == flushoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; + } + rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; + pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); @@ -122,6 +129,7 @@ _err: static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); + rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); @@ -129,6 +137,20 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->rCache.rMutex); } +int32_t tsdbCacheCommit(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + SLastCol *tsdbCacheDeserialize(char const *value) { if (!value) { return NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 847125018c..74168591d2 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -144,8 +144,8 @@ _exit: } int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - bool diskAvail = osDataSpaceAvailable(); - bool needCommit = false; + bool diskAvail = osDataSpaceAvailable(); + bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { @@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); TSDB_CHECK_CODE(code, lino, _exit); -- GitLab