提交 8f210903 编写于 作者: M Minglei Jin

cache/delete: just remove affected cache entries

上级 9918d326
......@@ -809,6 +809,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, char const *lstring);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
......
......@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
......@@ -101,14 +100,14 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions;
pTsdb->rCache.readoptions = readoptions;
pTsdb->rCache.db = db;
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
return code;
_err4:
......@@ -127,6 +126,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options);
taosThreadMutexDestroy(&pTsdb->rCache.rMutex);
}
SLastCol *tsdbCacheDeserialize(char const *value) {
......@@ -208,14 +208,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid;
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid);
if (last_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
int lr_key_len =
snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid);
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid);
if (lr_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
......@@ -227,6 +227,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
......@@ -260,7 +261,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = NULL;
if (NULL != values_list[i + num_keys]) {
pLastCol = tsdbCacheDeserialize(values_list[i]);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
}
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
......@@ -286,6 +287,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_clear(wb);
_exit:
......@@ -339,6 +341,98 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
return code;
}
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0;
// 1, fetch schema
STSchema *pTSchema = NULL;
int32_t sver = -1;
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
// 3, build keys & multi get from rocks
int num_keys = pTSchema->numOfCols;
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
for (int i = 0; i < num_keys; ++i) {
int16_t cid = pTSchema->columns[i].colId;
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid);
if (last_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid);
if (lr_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
keys_list[i] = keys;
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN;
keys_list_sizes[i] = last_key_len;
keys_list_sizes[num_keys + i] = lr_key_len;
}
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
}
for (int i = 0; i < num_keys * 2; ++i) {
rocksdb_free(errs[i]);
}
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
if (NULL != values_list[i]) {
pLastCol = tsdbCacheDeserialize(values_list[i]);
}
if (NULL != pLastCol || (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid);
rocksdb_writebatch_delete(wb, key, klen);
}
if (NULL != values_list[i + num_keys]) {
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
}
if (NULL != pLastCol || (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid);
rocksdb_writebatch_delete(wb, key, klen);
}
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);
}
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_clear(wb);
_exit:
taosMemoryFree(pTSchema);
return code;
}
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
......@@ -469,7 +563,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code;
}
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
......@@ -524,7 +618,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0;
STSRow *cacheRow = NULL;
......
......@@ -288,7 +288,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, lstring);
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayDestroy(pRow);
taosArrayClear(pRow);
continue;
}
......@@ -357,7 +357,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, lstring);
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayDestroy(pRow);
taosArrayClear(pRow);
continue;
}
......
......@@ -140,7 +140,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL;
SVBufPool *pPool = pTsdb->pVnode->inUse;
TSDBKEY lastKey = {.version = version, .ts = eKey};
// check if table exists
SMetaInfo info;
......@@ -181,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
}
......@@ -189,6 +188,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
}
*/
if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
}
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" at version %" PRId64,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册