diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a739140200e5e33f8665ab09592487adc139cc05..31b13b8411127cfc742ed78bfcc56191126b93fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { } static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { - rocksdb_writebatch_t *wb = NULL; + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; if (read) { if (lock) { taosThreadMutexLock(&pTsdb->lruMutex); @@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { if (lock) { taosThreadMutexLock(&pTsdb->rCache.rMutex); } - wb = pTsdb->rCache.writebatch; } + int count = rocksdb_writebatch_count(wb); if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, @@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { rocksdb_writebatch_clear(wb); } - if (read) { - if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex); - } else { - if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + + if (lock) { + if (read) { + taosThreadMutexUnlock(&pTsdb->lruMutex); + } else { + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } } } @@ -287,36 +291,45 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = length; } -int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { - SLastCol *pLastCol = (SLastCol *)value; +static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) { + STsdb *pTsdb = state->pTsdb; + SRocksCache *rCache = &pTsdb->rCache; + rocksdb_writebatch_t *wb = rCache->writebatch; + char *rocks_value = NULL; + size_t vlen = 0; - if (pLastCol->dirty) { - SCacheFlushState *state = (SCacheFlushState *)ud; - STsdb *pTsdb = state->pTsdb; - SRocksCache *rCache = &pTsdb->rCache; - rocksdb_writebatch_t *wb = rCache->writebatch; - char *rocks_value = NULL; - size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); - tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); - rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); + taosThreadMutexLock(&rCache->rMutex); - taosMemoryFree(rocks_value); + rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); - if (++state->flush_count >= ROCKS_BATCH_SIZE) { - char *err = NULL; - rocksdb_write(rCache->db, rCache->writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - state->flush_count, err); - rocksdb_free(err); - } + taosMemoryFree(rocks_value); - rocksdb_writebatch_clear(wb); + if (++state->flush_count >= ROCKS_BATCH_SIZE) { + char *err = NULL; - state->flush_count = 0; + rocksdb_write(rCache->db, rCache->writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, + state->flush_count, err); + rocksdb_free(err); } + rocksdb_writebatch_clear(wb); + + state->flush_count = 0; + } + + taosThreadMutexUnlock(&rCache->rMutex); +} + +int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { + SLastCol *pLastCol = (SLastCol *)value; + + if (pLastCol->dirty) { + tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); + pLastCol->dirty = 0; } @@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) { } static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { - (void)key; - (void)klen; SLastCol *pLastCol = (SLastCol *)value; if (pLastCol->dirty) { - SCacheFlushState *state = (SCacheFlushState *)ud; - STsdb *pTsdb = state->pTsdb; - SRocksCache *rCache = &pTsdb->rCache; - rocksdb_writebatch_t *wb = rCache->writebatch; - char *rocks_value = NULL; - size_t vlen = 0; - - tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); - rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); - - taosMemoryFree(rocks_value); - - if (++state->flush_count >= ROCKS_BATCH_SIZE) { - char *err = NULL; - rocksdb_write(rCache->db, rCache->writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - state->flush_count, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); - - state->flush_count = 0; - } + tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); } if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) { @@ -449,11 +436,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow tsdbRowClose(&iter); // 3, build keys & multi get from rocks - int num_keys = TARRAY_SIZE(aColVal); - TSKEY keyTs = TSDBROW_TS(pRow); - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - SArray *remainCols = NULL; - SLRUCache *pCache = pTsdb->lruCache; + int num_keys = TARRAY_SIZE(aColVal); + TSKEY keyTs = TSDBROW_TS(pRow); + SArray *remainCols = NULL; + SLRUCache *pCache = pTsdb->lruCache; taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { @@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (!pLastCol->dirty) { pLastCol->dirty = 1; } - /* - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - // tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - taosMemoryFree(value); - */ } taosLRUCacheRelease(pCache, h, false); @@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (!pLastCol->dirty) { pLastCol->dirty = 1; } - /* - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - taosMemoryFree(value); - */ } taosLRUCacheRelease(pCache, h, false); @@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(keys_list_sizes); taosMemoryFree(values_list_sizes); + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; @@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; + taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + pLastCol = (SLastCol *)value; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; @@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; + taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + pLastCol = (SLastCol *)value; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; @@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_free(values_list[i]); } + + rocksMayWrite(pTsdb, true, false, true); + taosMemoryFree(values_list); taosArrayDestroy(remainCols); } - rocksMayWrite(pTsdb, true, false, false); taosThreadMutexUnlock(&pTsdb->lruMutex); _exit: @@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache reallocVarData(&lastCol.colVal); taosArrayPush(pLastArray, &lastCol); - if (h) { - taosLRUCacheRelease(pCache, h, false); - } + taosLRUCacheRelease(pCache, h, false); } else { SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; @@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache reallocVarData(&lastCol.colVal); taosArraySet(pLastArray, idxKey->idx, &lastCol); - if (h) { - taosLRUCacheRelease(pCache, h, false); - } + taosLRUCacheRelease(pCache, h, false); taosArrayRemove(remainCols, i); } else { @@ -1137,6 +1115,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + for (int i = 0; i < num_keys * 2; ++i) { if (errs[i]) { rocksdb_free(errs[i]); @@ -1147,19 +1127,42 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + taosThreadMutexLock(&pTsdb->rCache.rMutex); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } - taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); - pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } - taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); + + taosThreadMutexLock(&pTsdb->lruMutex); + + LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + if (pLastCol->dirty) { + pLastCol->dirty = 0; + } + taosLRUCacheRelease(pTsdb->lruCache, h, true); + } + taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); + + h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + if (pLastCol->dirty) { + pLastCol->dirty = 0; + } + taosLRUCacheRelease(pTsdb->lruCache, h, true); + } + taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); + + taosThreadMutexUnlock(&pTsdb->lruMutex); } for (int i = 0; i < num_keys; ++i) { taosMemoryFree(keys_list[i]); @@ -1169,8 +1172,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, true, false, false); - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + rocksMayWrite(pTsdb, true, false, true); _exit: taosMemoryFree(pTSchema); @@ -1183,7 +1185,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { SLRUCache *pCache = NULL; size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; - pCache = taosLRUCacheInit(cfgCapacity, 1, .5); + pCache = taosLRUCacheInit(cfgCapacity, 0, .5); if (pCache == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err;