From d04a33f349f34f6ae49a56f53dfb61c1e302cd94 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 14 Jun 2023 15:35:21 +0800 Subject: [PATCH] fix(tsdb/cache): rewrite cache update to fix cpu usage --- source/dnode/vnode/src/tsdb/tsdbCache.c | 227 +++++++++++++++--------- 1 file changed, 140 insertions(+), 87 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4ec66f82a6..404ee33dc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -344,6 +344,11 @@ static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { } } +typedef struct { + int idx; + SLastKey key; +} SIdxKey; + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -370,113 +375,166 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow tsdbRowClose(&iter); // 3, build keys & multi get from rocks - int num_keys = TARRAY_SIZE(aColVal); - char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); - char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2); - for (int i = 0; i < num_keys; ++i) { - SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - int16_t cid = pColVal->cid; - - memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); - memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}, - ROCKS_KEY_LEN); - keys_list[i] = key_list + i * ROCKS_KEY_LEN; - keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN; - keys_list_sizes[i] = ROCKS_KEY_LEN; - keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN; - } - char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - taosThreadMutexLock(&pTsdb->rCache.rMutex); - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, - keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys * 2; ++i) { - rocksdb_free(errs[i]); - } - taosMemoryFree(key_list); - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(errs); - + int num_keys = TARRAY_SIZE(aColVal); TSKEY keyTs = TSDBROW_TS(pRow); rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + SArray *oColVal = taosArrayInit(num_keys, sizeof(SColVal)); + SArray *remainCols = NULL; + SLRUCache *pCache = pTsdb->lruCache; + + taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); + int16_t cid = pColVal->cid; - // if (!COL_VAL_IS_NONE(pColVal)) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); - - if (NULL == pLastCol || pLastCol->ts <= keyTs) { - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - pLastCol = (SLastCol *)value; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; + if (pLastCol->ts <= keyTs) { + uint8_t *pVal = NULL; + int nData = pLastCol->colVal.value.nData; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + pVal = pLastCol->colVal.value.pData; + } + pLastCol->ts = keyTs; + pLastCol->colVal = *pColVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (nData < pColVal->value.nData) { + taosMemoryFree(pVal); + pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); + } else { + pLastCol->colVal.value.pData = pVal; + } + if (pColVal->value.nData) { + memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } - reallocVarData(&pLastCol->colVal); - size_t charge = sizeof(*pLastCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { - charge += pLastCol->colVal.value.nData; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + // tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); } - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, - NULL, TAOS_LRU_PRIORITY_LOW); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; + taosLRUCacheRelease(pCache, h, false); + } else { + if (!remainCols) { + remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); } - - taosMemoryFree(value); + taosArrayPush(remainCols, &(SIdxKey){i, *key}); } if (COL_VAL_IS_VALUE(pColVal)) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - - if (NULL == pLastCol || pLastCol->ts <= keyTs) { - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; - - rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen); + key->ltype = 1; + LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - pLastCol = (SLastCol *)value; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; + if (pLastCol->ts <= keyTs) { + uint8_t *pVal = NULL; + int nData = pLastCol->colVal.value.nData; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + pVal = pLastCol->colVal.value.pData; + } + pLastCol->ts = keyTs; + pLastCol->colVal = *pColVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (nData < pColVal->value.nData) { + taosMemoryFree(pVal); + pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); + } else { + pLastCol->colVal.value.pData = pVal; + } + if (pColVal->value.nData) { + memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } - reallocVarData(&pLastCol->colVal); - size_t charge = sizeof(*pLastCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { - charge += pLastCol->colVal.value.nData; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); } - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, - NULL, TAOS_LRU_PRIORITY_LOW); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; + taosLRUCacheRelease(pCache, h, false); + } else { + if (!remainCols) { + remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); } + taosArrayPush(remainCols, &(SIdxKey){i, *key}); + } + } + } - taosMemoryFree(value); + num_keys = TARRAY_SIZE(remainCols); + if (remainCols && num_keys > 0) { + char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; + + keys_list[i] = (char *)&idxKey->key; + keys_list_sizes[i] = ROCKS_KEY_LEN; + } + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + rocksdb_free(errs[i]); + } + taosMemoryFree(errs); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list_sizes); + + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; + SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (idxKey->key.ltype == 0) { + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; + rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + taosMemoryFree(value); + } + } else { + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; + rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + taosMemoryFree(value); + } + } } + + rocksdb_free(values_list[i]); } - //} + taosMemoryFree(values_list); - rocksdb_free(values_list[i]); - rocksdb_free(values_list[i + num_keys]); + taosArrayDestroy(remainCols); } - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); rocksMayWrite(pTsdb, true, false, false); - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + taosThreadMutexUnlock(&pTsdb->lruMutex); _exit: taosArrayDestroy(aColVal); @@ -651,11 +709,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl return pLastCol; } -typedef struct { - int idx; - SLastKey key; -} SIdxKey; - static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; -- GitLab