diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c704472dffc5ee5c0be877d288b0a52ffdbfc01a..95bce321964b432182aaeef556bec8e52509e31e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -346,6 +346,7 @@ struct STsdbFS { typedef struct { rocksdb_t *db; rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 69eacfa46e84326ef907c924e044bd035bd9f0d5..94e5f253bfb73323f14c649f5356b7ed6df70786 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -178,6 +178,7 @@ int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); +int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); @@ -194,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); + int32_t type); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d5c3b2a789d8324ea5af583d5abbd532bb03bb89..17aed622413d5979c9dcd0b2e1c948a74b2bd965 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -98,12 +98,19 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { goto _err3; } + rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); + if (NULL == flushoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; + } + rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; + pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); @@ -122,6 +129,7 @@ _err: static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); + rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); @@ -129,6 +137,20 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->rCache.rMutex); } +int32_t tsdbCacheCommit(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + SLastCol *tsdbCacheDeserialize(char const *value) { if (!value) { return NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 847125018c5f15baa660dc2f53677e17e18e0d7f..74168591d21ef151b0885e15076dc0842b7ab445 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -144,8 +144,8 @@ _exit: } int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - bool diskAvail = osDataSpaceAvailable(); - bool needCommit = false; + bool diskAvail = osDataSpaceAvailable(); + bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { @@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); TSDB_CHECK_CODE(code, lino, _exit);