提交 81048f93 编写于 作者: M Minglei Jin

fix(cache): sychronize commit, read and write threads with writebatch

上级 75ccd024
......@@ -211,10 +211,18 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosMemoryFree(pTsdb->rCache.pTSchema);
}
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) {
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_t *wb = NULL;
if (read) {
if (lock) {
taosThreadMutexLock(&pTsdb->lruMutex);
}
wb = pTsdb->rCache.rwritebatch;
} else {
if (lock) {
taosThreadMutexLock(&pTsdb->rCache.rMutex);
}
wb = pTsdb->rCache.writebatch;
}
int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= 1024) {
......@@ -228,14 +236,19 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) {
rocksdb_writebatch_clear(wb);
}
if (read) {
if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex);
} else {
if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
}
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
rocksMayWrite(pTsdb, true, false);
rocksMayWrite(pTsdb, true, true);
rocksMayWrite(pTsdb, true, false, true);
rocksMayWrite(pTsdb, true, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
......@@ -461,7 +474,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, false, false);
rocksMayWrite(pTsdb, false, false, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit:
......@@ -560,7 +573,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR
}
if (wb) {
rocksMayWrite(pTsdb, false, true);
rocksMayWrite(pTsdb, false, true, false);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
......@@ -628,7 +641,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
}
if (wb) {
rocksMayWrite(pTsdb, false, true);
rocksMayWrite(pTsdb, false, true, false);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
......@@ -710,7 +723,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
if (wb) {
rocksMayWrite(pTsdb, false, true);
rocksMayWrite(pTsdb, false, true, false);
}
taosArrayDestroy(pTmpColArray);
......@@ -940,7 +953,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false);
rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
......@@ -979,7 +992,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, true, false);
rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册