diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 486becdf968e3d77d090c161b814891ec90de501..9baf38bca86ea9bd88945438cc20e4a96888de7f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -305,10 +305,6 @@ void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proa // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); -#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) -#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) -#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) - // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d7f0ef041a8db9d9e66f3cd8cda6a598b5d5a466..c92cdd32b0b3b4dabe483bd22ca5cf84e6986dd6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -197,7 +197,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); @@ -405,6 +405,10 @@ struct SVnode { #define VND_IS_RSMA(v) ((v)->config.isRsma == 1) #define VND_IS_TSMA(v) ((v)->config.isTsma == 1) +#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) +#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) +#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) + struct STbUidStore { tb_uid_t suid; SArray* tbUids; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3890e3d1f7b8f5a09ccc61b1d88aaaf2d29c4a7d..8f0d541d4915bf522bd805f509710dbd53641d14 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -208,10 +208,26 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } +static void rocksMayWrite(STsdb *pTsdb, bool force) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + if (force || rocksdb_writebatch_count(wb) >= 1024) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + rocksdb_writebatch_clear(wb); + } +} + int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); @@ -281,22 +297,6 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t return pLastCol; } -static void rocksMayWrite(STsdb *pTsdb) { - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - - int count = rocksdb_writebatch_count(wb); - if (count >= 1024) { - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); - } -} - int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -398,7 +398,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: @@ -507,7 +507,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -575,7 +575,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -672,7 +672,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosArrayDestroy(pTmpColArray); @@ -734,10 +734,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = -1; } - SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); + // SLastCol lastCol = *pLastCol; + // reallocVarData(&lastCol.colVal); - taosArraySet(pLastArray, idxKey->idx, &lastCol); + taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArrayRemove(remainCols, j); taosMemoryFree(values_list[i]); @@ -941,7 +941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: