未验证 提交 9e73a693 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21389 from taosdata/fix/TD-24306

fix(cache/commit): skip batch write if empty
...@@ -351,6 +351,7 @@ typedef struct { ...@@ -351,6 +351,7 @@ typedef struct {
rocksdb_writeoptions_t *writeoptions; rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *writebatch;
rocksdb_writebatch_t *rwritebatch;
TdThreadMutex rMutex; TdThreadMutex rMutex;
STSchema *pTSchema; STSchema *pTSchema;
} SRocksCache; } SRocksCache;
......
...@@ -164,8 +164,10 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { ...@@ -164,8 +164,10 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
} }
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.rwritebatch = rwritebatch;
pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.my_comparator = cmp;
pTsdb->rCache.options = options; pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.writeoptions = writeoptions;
...@@ -198,6 +200,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { ...@@ -198,6 +200,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db); rocksdb_close(pTsdb->rCache.db);
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_options_destroy(pTsdb->rCache.options);
...@@ -208,14 +211,18 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { ...@@ -208,14 +211,18 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosMemoryFree(pTsdb->rCache.pTSchema); taosMemoryFree(pTsdb->rCache.pTSchema);
} }
static void rocksMayWrite(STsdb *pTsdb, bool force) { static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) {
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
if (read) {
if (force || rocksdb_writebatch_count(wb) >= 1024) { wb = pTsdb->rCache.rwritebatch;
}
int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= 1024) {
char *err = NULL; char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
err);
rocksdb_free(err); rocksdb_free(err);
} }
...@@ -227,7 +234,8 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { ...@@ -227,7 +234,8 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
char *err = NULL; char *err = NULL;
rocksMayWrite(pTsdb, true); rocksMayWrite(pTsdb, true, false);
rocksMayWrite(pTsdb, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
...@@ -453,7 +461,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -453,7 +461,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, false); rocksMayWrite(pTsdb, false, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit: _exit:
...@@ -537,7 +545,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR ...@@ -537,7 +545,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR
} }
// store result back to rocks cache // store result back to rocks cache
wb = pTsdb->rCache.writebatch; wb = pTsdb->rCache.rwritebatch;
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen); tsdbCacheSerialize(pLastCol, &value, &vlen);
...@@ -552,7 +560,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR ...@@ -552,7 +560,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR
} }
if (wb) { if (wb) {
rocksMayWrite(pTsdb, false); rocksMayWrite(pTsdb, false, true);
} }
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
...@@ -602,7 +610,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl ...@@ -602,7 +610,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
} }
// store result back to rocks cache // store result back to rocks cache
wb = pTsdb->rCache.writebatch; wb = pTsdb->rCache.rwritebatch;
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen); tsdbCacheSerialize(pLastCol, &value, &vlen);
...@@ -620,7 +628,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl ...@@ -620,7 +628,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
} }
if (wb) { if (wb) {
rocksMayWrite(pTsdb, false); rocksMayWrite(pTsdb, false, true);
} }
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
...@@ -694,7 +702,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr ...@@ -694,7 +702,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
// store result back to rocks cache // store result back to rocks cache
wb = pTsdb->rCache.writebatch; wb = pTsdb->rCache.rwritebatch;
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen); tsdbCacheSerialize(pLastCol, &value, &vlen);
...@@ -706,7 +714,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr ...@@ -706,7 +714,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
if (wb) { if (wb) {
rocksMayWrite(pTsdb, false); rocksMayWrite(pTsdb, false, true);
} }
taosArrayDestroy(pTmpColArray); taosArrayDestroy(pTmpColArray);
...@@ -936,7 +944,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -936,7 +944,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true); rocksMayWrite(pTsdb, true, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs); keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
...@@ -975,7 +983,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -975,7 +983,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, true); rocksMayWrite(pTsdb, true, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册