From b0f65dfa62c59c757d8276b3ceaa42e4e10009c1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 28 Jul 2023 16:32:07 +0800 Subject: [PATCH] fix(tsdb/cache/del): use lru mutex to fix race with committing --- source/dnode/vnode/src/tsdb/tsdbCache.c | 118 +++--------------------- 1 file changed, 13 insertions(+), 105 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index df6cc1b5e7..2a0729682c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache return code; } -/* -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; - SLRUCache *pCache = pTsdb->lruCache; - SArray *pCidList = pr->pCidList; - int num_keys = TARRAY_SIZE(pCidList); - - for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = NULL; - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - taosThreadMutexLock(&pTsdb->lruMutex); - h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); - - size_t charge = sizeof(*pLastCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, - TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - - taosThreadMutexUnlock(&pTsdb->lruMutex); - } - - pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - - SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); - if (h) { - taosLRUCacheRelease(pCache, h, false); - } - - taosArrayPush(pLastArray, &lastCol); - } - - return code; -} -*/ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema @@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, @@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksMayWrite(pTsdb, true, false, true); + taosThreadMutexUnlock(&pTsdb->lruMutex); + _exit: taosMemoryFree(pTSchema); @@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } -/* -int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { - int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; - // getTableCacheKey(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - } - - // getTableCacheKey(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); - } - - return code; -} -*/ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea } if (record.version <= pReader->info.verRange.maxVer) { + /* + tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records", + TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid); + */ SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; taosArrayPush(pInfo->pTombData, &delData); } @@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie goto _err; } - loadDataTomb(state->pr, state->pr->pFileReader); - state->pr->pCurFileSet = state->pFileSet; + + loadDataTomb(state->pr, state->pr->pFileReader); } if (!state->pIndexList) { @@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->iBrinIndex = indexSize; } + if (state->pFileSet != state->pr->pCurFileSet) { + state->pr->pCurFileSet = state->pFileSet; + } + code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, state->pr, state->lastTs, aCols, nCols); if (code != TSDB_CODE_SUCCESS) { -- GitLab