diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9baf38bca86ea9bd88945438cc20e4a96888de7f..8fddb9790946e2d0aee2c7d0417d6f271abffd89 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -351,6 +351,7 @@ typedef struct { rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; + rocksdb_writebatch_t *rwritebatch; TdThreadMutex rMutex; STSchema *pTSchema; } SRocksCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2692a7316e2d53f49f52ceb26d6b5851701e377d..bbd7663622675c26aa835e5421e61599c6f9c25c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -164,8 +164,10 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); + rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; + pTsdb->rCache.rwritebatch = rwritebatch; pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; @@ -198,6 +200,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); + rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); @@ -208,14 +211,18 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } -static void rocksMayWrite(STsdb *pTsdb, bool force) { +static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - - if (force || rocksdb_writebatch_count(wb) >= 1024) { + if (read) { + wb = pTsdb->rCache.rwritebatch; + } + int count = rocksdb_writebatch_count(wb); + if ((force && count > 0) || count >= 1024) { char *err = NULL; rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, + err); rocksdb_free(err); } @@ -227,7 +234,8 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); @@ -453,7 +461,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, false); + rocksMayWrite(pTsdb, false, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: @@ -537,7 +545,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR } // store result back to rocks cache - wb = pTsdb->rCache.writebatch; + wb = pTsdb->rCache.rwritebatch; char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); @@ -552,7 +560,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR } if (wb) { - rocksMayWrite(pTsdb, false); + rocksMayWrite(pTsdb, false, true); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -602,7 +610,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl } // store result back to rocks cache - wb = pTsdb->rCache.writebatch; + wb = pTsdb->rCache.rwritebatch; char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); @@ -620,7 +628,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl } if (wb) { - rocksMayWrite(pTsdb, false); + rocksMayWrite(pTsdb, false, true); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -694,7 +702,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // store result back to rocks cache - wb = pTsdb->rCache.writebatch; + wb = pTsdb->rCache.rwritebatch; char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); @@ -706,7 +714,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb, false); + rocksMayWrite(pTsdb, false, true); } taosArrayDestroy(pTmpColArray); @@ -936,7 +944,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); taosThreadMutexLock(&pTsdb->rCache.rMutex); - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, true, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -975,7 +983,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, true, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: