From cce557bd3ea816fb0f5e61d32d227ae9e91d532b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 28 Dec 2021 16:11:48 +0800 Subject: [PATCH] handle cache write/reade concurrent problem --- source/libs/index/inc/index_cache.h | 16 ++- source/libs/index/src/index_cache.c | 212 ++++++++++++++++++---------- 2 files changed, 149 insertions(+), 79 deletions(-) diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 0e7405869a..679de3c0a5 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -30,14 +30,18 @@ extern "C" { #endif +typedef struct MemTable { + T_REF_DECLARE() + SSkipList* mem; +} MemTable; typedef struct IndexCache { T_REF_DECLARE() - SSkipList *mem, *imm; - SIndex* index; - char* colName; - int32_t version; - int32_t nTerm; - int8_t type; + MemTable *mem, *imm; + SIndex* index; + char* colName; + int32_t version; + int32_t nTerm; + int8_t type; pthread_mutex_t mtx; } IndexCache; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 3f99d04bc9..f610ff9a11 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -25,44 +25,20 @@ //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) -static void cacheTermDestroy(CacheTerm* ct) { - if (ct == NULL) { return; } +void indexMemRef(MemTable* tbl); +void indexMemUnRef(MemTable* tbl); - free(ct->colVal); - free(ct); -} -static char* getIndexKey(const void* pData) { - CacheTerm* p = (CacheTerm*)pData; - return (char*)p; -} +void indexCacheRef(IndexCache* cache); +void indexCacheUnRef(IndexCache* cache); -static int32_t compareKey(const void* l, const void* r) { - CacheTerm* lt = (CacheTerm*)l; - CacheTerm* rt = (CacheTerm*)r; +static void cacheTermDestroy(CacheTerm* ct); +static char* getIndexKey(const void* pData); +static int32_t compareKey(const void* l, const void* r); +static MemTable* indexInternalCacheCreate(int8_t type); - // compare colVal - int i, j; - for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { - if (lt->colVal[i] == rt->colVal[j]) { - continue; - } else { - return lt->colVal[i] < rt->colVal[j] ? -1 : 1; - } - } - if (i < lt->nColVal) { - return 1; - } else if (j < rt->nColVal) { - return -1; - } - // compare version - return rt->version - lt->version; -} - -static SSkipList* indexInternalCacheCreate(int8_t type) { - if (type == TSDB_DATA_TYPE_BINARY) { - return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); - } -} +static void doMergeWork(SSchedMsg* msg); +static bool indexCacheIteratorNext(Iterate* itera); +static IterateValue* indexCacheIteratorGetValue(Iterate* iter); IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); @@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { return cache; } void indexCacheDebug(IndexCache* cache) { - SSkipListIterator* iter = tSkipListCreateIter(cache->mem); + MemTable* tbl = NULL; + + pthread_mutex_lock(&cache->mtx); + tbl = cache->mem; + indexMemRef(tbl); + pthread_mutex_unlock(&cache->mtx); + + SSkipList* slt = tbl->mem; + SSkipListIterator* iter = tSkipListCreateIter(slt); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); @@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) { } } tSkipListDestroyIter(iter); + + indexMemUnRef(tbl); } void indexCacheDestroySkiplist(SSkipList* slt) { @@ -103,60 +89,33 @@ void indexCacheDestroySkiplist(SSkipList* slt) { if (ct != NULL) {} } tSkipListDestroyIter(iter); + tSkipListDestroy(slt); } void indexCacheDestroyImm(IndexCache* cache) { + MemTable* tbl = NULL; pthread_mutex_lock(&cache->mtx); - SSkipList* timm = (SSkipList*)cache->imm; + tbl = cache->imm; cache->imm = NULL; // or throw int bg thread pthread_mutex_unlock(&cache->mtx); - - indexCacheDestroySkiplist(timm); + indexMemUnRef(tbl); } void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } - tSkipListDestroy(pCache->mem); - tSkipListDestroy(pCache->imm); + indexMemUnRef(pCache->mem); + indexMemUnRef(pCache->imm); free(pCache->colName); free(pCache); } -static void doMergeWork(SSchedMsg* msg) { - IndexCache* pCache = msg->ahandle; - SIndex* sidx = (SIndex*)pCache->index; - indexFlushCacheTFile(sidx, pCache); -} -static bool indexCacheIteratorNext(Iterate* itera) { - SSkipListIterator* iter = itera->iter; - if (iter == NULL) { return false; } - - IterateValue* iv = &itera->val; - iterateValueDestroy(iv, false); - - bool next = tSkipListIterNext(iter); - if (next) { - SSkipListNode* node = tSkipListIterGet(iter); - CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - - iv->type = ct->operaType; - iv->colVal = ct->colVal; - - taosArrayPush(iv->val, &ct->uid); - } - - return next; -} - -static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { - return &iter->val; -} Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* iiter = calloc(1, sizeof(Iterate)); if (iiter == NULL) { return NULL; } + MemTable* tbl = cache->imm; iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); - iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL; + iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL; iiter->next = indexCacheIteratorNext; iiter->getValue = indexCacheIteratorGetValue; @@ -220,8 +179,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // ugly code, refactor later pthread_mutex_lock(&pCache->mtx); + indexCacheMakeRoomForWrite(pCache); - tSkipListPut(pCache->mem, (char*)ct); + MemTable* tbl = pCache->mem; + indexMemRef(tbl); + tSkipListPut(tbl->mem, (char*)ct); + indexMemUnRef(tbl); + pthread_mutex_unlock(&pCache->mtx); indexCacheUnRef(pCache); @@ -238,6 +202,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; + MemTable *mem = NULL, *imm = NULL; + pthread_mutex_lock(&pCache->mtx); + mem = pCache->mem; + imm = pCache->imm; + indexMemRef(mem); + indexMemRef(imm); + pthread_mutex_unlock(&pCache->mtx); + CacheTerm* ct = calloc(1, sizeof(CacheTerm)); if (ct == NULL) { return -1; } ct->nColVal = term->nColVal; @@ -247,7 +219,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV char* key = getIndexKey(ct); // TODO handle multi situation later, and refactor - SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { @@ -279,14 +251,108 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV } else if (qtype == QUERY_REGEX) { // } + indexMemUnRef(mem); + indexMemUnRef(imm); return 0; } void indexCacheRef(IndexCache* cache) { + if (cache == NULL) { return; } + int ref = T_REF_INC(cache); UNUSED(ref); } void indexCacheUnRef(IndexCache* cache) { + if (cache == NULL) { return; } + int ref = T_REF_DEC(cache); if (ref == 0) { indexCacheDestroy(cache); } } + +void indexMemRef(MemTable* tbl) { + if (tbl == NULL) { return; } + int ref = T_REF_INC(tbl); + UNUSED(ref); +} +void indexMemUnRef(MemTable* tbl) { + if (tbl == NULL) { return; } + + int ref = T_REF_DEC(tbl); + if (ref == 0) { + SSkipList* slt = tbl->mem; + indexCacheDestroySkiplist(slt); + free(tbl); + } +} + +static void cacheTermDestroy(CacheTerm* ct) { + if (ct == NULL) { return; } + + free(ct->colVal); + free(ct); +} +static char* getIndexKey(const void* pData) { + CacheTerm* p = (CacheTerm*)pData; + return (char*)p; +} + +static int32_t compareKey(const void* l, const void* r) { + CacheTerm* lt = (CacheTerm*)l; + CacheTerm* rt = (CacheTerm*)r; + + // compare colVal + int i, j; + for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { + if (lt->colVal[i] == rt->colVal[j]) { + continue; + } else { + return lt->colVal[i] < rt->colVal[j] ? -1 : 1; + } + } + if (i < lt->nColVal) { + return 1; + } else if (j < rt->nColVal) { + return -1; + } + // compare version + return rt->version - lt->version; +} + +static MemTable* indexInternalCacheCreate(int8_t type) { + MemTable* tbl = calloc(1, sizeof(MemTable)); + indexMemRef(tbl); + if (type == TSDB_DATA_TYPE_BINARY) { + tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + } + return tbl; +} + +static void doMergeWork(SSchedMsg* msg) { + IndexCache* pCache = msg->ahandle; + SIndex* sidx = (SIndex*)pCache->index; + indexFlushCacheTFile(sidx, pCache); +} +static bool indexCacheIteratorNext(Iterate* itera) { + SSkipListIterator* iter = itera->iter; + if (iter == NULL) { return false; } + + IterateValue* iv = &itera->val; + iterateValueDestroy(iv, false); + + bool next = tSkipListIterNext(iter); + if (next) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + + iv->type = ct->operaType; + iv->colVal = ct->colVal; + + taosArrayPush(iv->val, &ct->uid); + } + + return next; +} + +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { + return &iter->val; +} -- GitLab