提交 f428a68e 编写于 作者: M Minglei Jin

cache/update: update lru when deleting or updating

上级 b892b1fd
...@@ -297,6 +297,31 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ...@@ -297,6 +297,31 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t
return pLastCol; return pLastCol;
} }
static void reallocVarData(SColVal *pColVal) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
}
static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) {
SLastCol *pLastCol = (SLastCol *)value;
// TODO: add dirty flag to SLastCol
if (pLastCol->dirty) {
// TODO: queue into dirty list, free it after save to backstore
} else {
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
taosMemoryFree(value);
}
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
...@@ -373,6 +398,24 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -373,6 +398,24 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN; size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value); taosMemoryFree(value);
} }
...@@ -384,9 +427,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -384,9 +427,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value); taosMemoryFree(value);
} }
} }
...@@ -407,16 +467,6 @@ _exit: ...@@ -407,16 +467,6 @@ _exit:
return code; return code;
} }
static void reallocVarData(SColVal *pColVal) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
}
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds); int nCols, int16_t *slotIds);
...@@ -584,21 +634,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl ...@@ -584,21 +634,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
return pLastCol; return pLastCol;
} }
static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) {
SLastCol *pLastCol = (SLastCol *)value;
// TODO: add dirty flag to SLastCol
if (pLastCol->dirty) {
// TODO: queue into dirty list, free it after save to backstore
} else {
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
taosMemoryFree(value);
}
}
typedef struct { typedef struct {
int idx; int idx;
SLastKey key; SLastKey key;
...@@ -734,9 +769,6 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA ...@@ -734,9 +769,6 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
code = -1; code = -1;
} }
// SLastCol lastCol = *pLastCol;
// reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArraySet(pLastArray, idxKey->idx, pLastCol);
taosArrayRemove(remainCols, j); taosArrayRemove(remainCols, j);
...@@ -875,16 +907,19 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -875,16 +907,19 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0; int32_t code = 0;
// 1, fetch schema // fetch schema
STSchema *pTSchema = NULL; STSchema *pTSchema = pTsdb->rCache.pTSchema;
int32_t sver = -1; if (!pTSchema) {
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return -1; return -1;
}
pTsdb->rCache.pTSchema = pTSchema;
} }
// 3, build keys & multi get from rocks // build keys & multi get from rocks
int num_keys = pTSchema->numOfCols; int num_keys = pTSchema->numOfCols;
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
...@@ -925,6 +960,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -925,6 +960,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
size_t klen = sizeof(*key); size_t klen = sizeof(*key);
rocksdb_writebatch_delete(wb, (char *)key, klen); rocksdb_writebatch_delete(wb, (char *)key, klen);
taosLRUCacheErase(pTsdb->lruCache, key, klen);
} }
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
...@@ -933,6 +969,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -933,6 +969,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
size_t klen = sizeof(*key); size_t klen = sizeof(*key);
rocksdb_writebatch_delete(wb, (char *)key, klen); rocksdb_writebatch_delete(wb, (char *)key, klen);
taosLRUCacheErase(pTsdb->lruCache, key, klen);
} }
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册