diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8f0d541d4915bf522bd805f509710dbd53641d14..2eddc29c0e35c393b241d4e0c86274ac9203aad3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -297,6 +297,31 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t return pLastCol; } +static void reallocVarData(SColVal *pColVal) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } +} + +static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { + SLastCol *pLastCol = (SLastCol *)value; + + // TODO: add dirty flag to SLastCol + if (pLastCol->dirty) { + // TODO: queue into dirty list, free it after save to backstore + } else { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + + taosMemoryFree(value); + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -373,6 +398,24 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; size_t klen = ROCKS_KEY_LEN; rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + taosMemoryFree(value); } @@ -384,9 +427,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, + tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + taosMemoryFree(value); } } @@ -407,16 +467,6 @@ _exit: return code; } -static void reallocVarData(SColVal *pColVal) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); @@ -584,21 +634,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl return pLastCol; } -static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { - SLastCol *pLastCol = (SLastCol *)value; - - // TODO: add dirty flag to SLastCol - if (pLastCol->dirty) { - // TODO: queue into dirty list, free it after save to backstore - } else { - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { - taosMemoryFree(pLastCol->colVal.value.pData); - } - - taosMemoryFree(value); - } -} - typedef struct { int idx; SLastKey key; @@ -734,9 +769,6 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = -1; } - // SLastCol lastCol = *pLastCol; - // reallocVarData(&lastCol.colVal); - taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArrayRemove(remainCols, j); @@ -875,16 +907,19 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; - // 1, fetch schema - STSchema *pTSchema = NULL; - int32_t sver = -1; - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + // fetch schema + STSchema *pTSchema = pTsdb->rCache.pTSchema; + if (!pTSchema) { + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + pTsdb->rCache.pTSchema = pTSchema; } - // 3, build keys & multi get from rocks + // build keys & multi get from rocks int num_keys = pTSchema->numOfCols; char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -925,6 +960,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t klen = sizeof(*key); rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -933,6 +969,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t klen = sizeof(*key); rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } rocksdb_free(values_list[i]);