diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1fae726bbb991ab19ebcb3b956d474fe4ddd79f3..ddcb432caaf387339450298b6325f68ca06c1a18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -211,10 +211,18 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } -static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; +static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { + rocksdb_writebatch_t *wb = NULL; if (read) { + if (lock) { + taosThreadMutexLock(&pTsdb->lruMutex); + } wb = pTsdb->rCache.rwritebatch; + } else { + if (lock) { + taosThreadMutexLock(&pTsdb->rCache.rMutex); + } + wb = pTsdb->rCache.writebatch; } int count = rocksdb_writebatch_count(wb); if ((force && count > 0) || count >= 1024) { @@ -228,14 +236,19 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { rocksdb_writebatch_clear(wb); } + if (read) { + if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex); + } else { + if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } } int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; - rocksMayWrite(pTsdb, true, false); - rocksMayWrite(pTsdb, true, true); + rocksMayWrite(pTsdb, true, false, true); + rocksMayWrite(pTsdb, true, true, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); @@ -461,7 +474,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, false, false); + rocksMayWrite(pTsdb, false, false, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: @@ -560,7 +573,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR } if (wb) { - rocksMayWrite(pTsdb, false, true); + rocksMayWrite(pTsdb, false, true, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -628,7 +641,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl } if (wb) { - rocksMayWrite(pTsdb, false, true); + rocksMayWrite(pTsdb, false, true, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -710,7 +723,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb, false, true); + rocksMayWrite(pTsdb, false, true, false); } taosArrayDestroy(pTmpColArray); @@ -940,7 +953,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); taosThreadMutexLock(&pTsdb->rCache.rMutex); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -979,7 +992,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true, false, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: