提交 cce557bd 编写于 作者: dengyihao's avatar dengyihao

handle cache write/reade concurrent problem

上级 80718aee
......@@ -30,14 +30,18 @@
extern "C" {
#endif
typedef struct MemTable {
T_REF_DECLARE()
SSkipList* mem;
} MemTable;
typedef struct IndexCache {
T_REF_DECLARE()
SSkipList *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int32_t nTerm;
int8_t type;
MemTable *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int32_t nTerm;
int8_t type;
pthread_mutex_t mtx;
} IndexCache;
......
......@@ -25,44 +25,20 @@
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) { return; }
void indexMemRef(MemTable* tbl);
void indexMemUnRef(MemTable* tbl);
free(ct->colVal);
free(ct);
}
static char* getIndexKey(const void* pData) {
CacheTerm* p = (CacheTerm*)pData;
return (char*)p;
}
void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache);
static int32_t compareKey(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
static void cacheTermDestroy(CacheTerm* ct);
static char* getIndexKey(const void* pData);
static int32_t compareKey(const void* l, const void* r);
static MemTable* indexInternalCacheCreate(int8_t type);
// compare colVal
int i, j;
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
if (lt->colVal[i] == rt->colVal[j]) {
continue;
} else {
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
}
}
if (i < lt->nColVal) {
return 1;
} else if (j < rt->nColVal) {
return -1;
}
// compare version
return rt->version - lt->version;
}
static SSkipList* indexInternalCacheCreate(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
}
}
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
IndexCache* cache = calloc(1, sizeof(IndexCache));
......@@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
return cache;
}
void indexCacheDebug(IndexCache* cache) {
SSkipListIterator* iter = tSkipListCreateIter(cache->mem);
MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx);
tbl = cache->mem;
indexMemRef(tbl);
pthread_mutex_unlock(&cache->mtx);
SSkipList* slt = tbl->mem;
SSkipListIterator* iter = tSkipListCreateIter(slt);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
......@@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) {
}
}
tSkipListDestroyIter(iter);
indexMemUnRef(tbl);
}
void indexCacheDestroySkiplist(SSkipList* slt) {
......@@ -103,60 +89,33 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
if (ct != NULL) {}
}
tSkipListDestroyIter(iter);
tSkipListDestroy(slt);
}
void indexCacheDestroyImm(IndexCache* cache) {
MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx);
SSkipList* timm = (SSkipList*)cache->imm;
tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread
pthread_mutex_unlock(&cache->mtx);
indexCacheDestroySkiplist(timm);
indexMemUnRef(tbl);
}
void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache;
if (pCache == NULL) { return; }
tSkipListDestroy(pCache->mem);
tSkipListDestroy(pCache->imm);
indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm);
free(pCache->colName);
free(pCache);
}
static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; }
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->colVal = ct->colVal;
taosArrayPush(iv->val, &ct->uid);
}
return next;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
return &iter->val;
}
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
Iterate* iiter = calloc(1, sizeof(Iterate));
if (iiter == NULL) { return NULL; }
MemTable* tbl = cache->imm;
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
......@@ -220,8 +179,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// ugly code, refactor later
pthread_mutex_lock(&pCache->mtx);
indexCacheMakeRoomForWrite(pCache);
tSkipListPut(pCache->mem, (char*)ct);
MemTable* tbl = pCache->mem;
indexMemRef(tbl);
tSkipListPut(tbl->mem, (char*)ct);
indexMemUnRef(tbl);
pthread_mutex_unlock(&pCache->mtx);
indexCacheUnRef(pCache);
......@@ -238,6 +202,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
MemTable *mem = NULL, *imm = NULL;
pthread_mutex_lock(&pCache->mtx);
mem = pCache->mem;
imm = pCache->imm;
indexMemRef(mem);
indexMemRef(imm);
pthread_mutex_unlock(&pCache->mtx);
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (ct == NULL) { return -1; }
ct->nColVal = term->nColVal;
......@@ -247,7 +219,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
char* key = getIndexKey(ct);
// TODO handle multi situation later, and refactor
SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) {
......@@ -279,14 +251,108 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
} else if (qtype == QUERY_REGEX) {
//
}
indexMemUnRef(mem);
indexMemUnRef(imm);
return 0;
}
void indexCacheRef(IndexCache* cache) {
if (cache == NULL) { return; }
int ref = T_REF_INC(cache);
UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
if (cache == NULL) { return; }
int ref = T_REF_DEC(cache);
if (ref == 0) { indexCacheDestroy(cache); }
}
void indexMemRef(MemTable* tbl) {
if (tbl == NULL) { return; }
int ref = T_REF_INC(tbl);
UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
if (tbl == NULL) { return; }
int ref = T_REF_DEC(tbl);
if (ref == 0) {
SSkipList* slt = tbl->mem;
indexCacheDestroySkiplist(slt);
free(tbl);
}
}
static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) { return; }
free(ct->colVal);
free(ct);
}
static char* getIndexKey(const void* pData) {
CacheTerm* p = (CacheTerm*)pData;
return (char*)p;
}
static int32_t compareKey(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
// compare colVal
int i, j;
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
if (lt->colVal[i] == rt->colVal[j]) {
continue;
} else {
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
}
}
if (i < lt->nColVal) {
return 1;
} else if (j < rt->nColVal) {
return -1;
}
// compare version
return rt->version - lt->version;
}
static MemTable* indexInternalCacheCreate(int8_t type) {
MemTable* tbl = calloc(1, sizeof(MemTable));
indexMemRef(tbl);
if (type == TSDB_DATA_TYPE_BINARY) {
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
}
return tbl;
}
static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; }
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->colVal = ct->colVal;
taosArrayPush(iv->val, &ct->uid);
}
return next;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
return &iter->val;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册