提交 eab330c0 编写于 作者: dengyihao's avatar dengyihao

enh: refactor index code

上级 d9c83f63
...@@ -62,25 +62,25 @@ typedef struct CacheTerm { ...@@ -62,25 +62,25 @@ typedef struct CacheTerm {
} CacheTerm; } CacheTerm;
// //
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type); IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheForceToMerge(void* cache); void idxCacheForceToMerge(void* cache);
void indexCacheDestroy(void* cache); void idxCacheDestroy(void* cache);
void indexCacheBroadcast(void* cache); void idxCacheBroadcast(void* cache);
void indexCacheWait(void* cache); void idxCacheWait(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache); Iterate* idxCacheIteratorCreate(IndexCache* cache);
void idxCacheIteratorDestroy(Iterate* iiter); void idxCacheIteratorDestroy(Iterate* iiter);
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst); // int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s); int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s);
void indexCacheRef(IndexCache* cache); void idxCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache); void idxCacheUnRef(IndexCache* cache);
void indexCacheDebug(IndexCache* cache); void idxCacheDebug(IndexCache* cache);
void idxCacheDestroyImm(IndexCache* cache); void idxCacheDestroyImm(IndexCache* cache);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -138,7 +138,7 @@ int32_t indexRemoveRef(int64_t ref); ...@@ -138,7 +138,7 @@ int32_t indexRemoveRef(int64_t ref);
void indexAcquireRef(int64_t ref); void indexAcquireRef(int64_t ref);
void indexReleaseRef(int64_t ref); void indexReleaseRef(int64_t ref);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf); int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
......
...@@ -117,10 +117,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order); ...@@ -117,10 +117,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw); int tfileWriterFinish(TFileWriter* tw);
// //
IndexTFile* indexTFileCreate(const char* path); IndexTFile* idxTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile); void idxTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr); int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr);
Iterate* tfileIteratorCreate(TFileReader* reader); Iterate* tfileIteratorCreate(TFileReader* reader);
void tfileIteratorDestroy(Iterate* iterator); void tfileIteratorDestroy(Iterate* iterator);
......
...@@ -106,8 +106,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -106,8 +106,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return -1; return -1;
} }
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)idxCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = idxTFileCreate(path);
if (sIdx->tindex == NULL) { if (sIdx->tindex == NULL) {
goto END; goto END;
} }
...@@ -136,7 +136,7 @@ void indexDestroy(void* handle) { ...@@ -136,7 +136,7 @@ void indexDestroy(void* handle) {
SIndex* sIdx = handle; SIndex* sIdx = handle;
taosThreadMutexDestroy(&sIdx->mtx); taosThreadMutexDestroy(&sIdx->mtx);
tsem_destroy(&sIdx->sem); tsem_destroy(&sIdx->sem);
indexTFileDestroy(sIdx->tindex); idxTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path); taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx); taosMemoryFree(sIdx);
return; return;
...@@ -147,12 +147,12 @@ void indexClose(SIndex* sIdx) { ...@@ -147,12 +147,12 @@ void indexClose(SIndex* sIdx) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache)); idxCacheForceToMerge((void*)(*pCache));
indexInfo("%s wait to merge", (*pCache)->colName); indexInfo("%s wait to merge", (*pCache)->colName);
indexWait((void*)(sIdx)); indexWait((void*)(sIdx));
indexInfo("%s finish to wait", (*pCache)->colName); indexInfo("%s finish to wait", (*pCache)->colName);
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache); idxCacheUnRef(*pCache);
} }
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL; sIdx->colObj = NULL;
...@@ -186,11 +186,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -186,11 +186,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = idxSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
if (cache == NULL) { if (cache == NULL) {
IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType); IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
} }
} }
...@@ -201,12 +201,12 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -201,12 +201,12 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = idxSerialCacheKey(&key, buf);
indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL); assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid); int ret = idxCachePut(*cache, p, uid);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
...@@ -331,7 +331,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) ...@@ -331,7 +331,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
ICacheKey key = { ICacheKey key = {
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = idxSerialCacheKey(&key, buf);
taosThreadMutexLock(&sIdx->mtx); taosThreadMutexLock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
...@@ -345,14 +345,14 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) ...@@ -345,14 +345,14 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SIdxTRslt* tr = idxTRsltCreate(); SIdxTRslt* tr = idxTRsltCreate();
if (0 == indexCacheSearch(cache, query, tr, &s)) { if (0 == idxCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName); indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex // coloum already drop by other oper, no need to query tindex
return 0; return 0;
} else { } else {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) { if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
goto END; goto END;
} }
...@@ -472,7 +472,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { ...@@ -472,7 +472,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
indexWarn("empty tfile reader found"); indexWarn("empty tfile reader found");
} }
// handle flush // handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* cacheIter = idxCacheIteratorCreate(pCache);
if (cacheIter == NULL) { if (cacheIter == NULL) {
indexError("%p immtable is empty, ignore merge opera", pCache); indexError("%p immtable is empty, ignore merge opera", pCache);
idxCacheDestroyImm(pCache); idxCacheDestroyImm(pCache);
...@@ -532,7 +532,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { ...@@ -532,7 +532,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
tfileIteratorDestroy(tfileIter); tfileIteratorDestroy(tfileIter);
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
indexCacheUnRef(pCache); idxCacheUnRef(pCache);
int64_t cost = taosGetTimestampUs() - st; int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) { if (ret != 0) {
...@@ -620,7 +620,7 @@ END: ...@@ -620,7 +620,7 @@ END:
return -1; return -1;
} }
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
char* p = buf; char* p = buf;
......
...@@ -68,7 +68,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRsl ...@@ -68,7 +68,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRsl
cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON, cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
cacheSearchRange_JSON}}; cacheSearchRange_JSON}};
static void doMergeWork(SSchedMsg* msg); static void idxDoMergeWork(SSchedMsg* msg);
static bool idxCacheIteratorNext(Iterate* itera); static bool idxCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
...@@ -331,9 +331,9 @@ static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, ST ...@@ -331,9 +331,9 @@ static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, ST
// impl later // impl later
return 0; return 0;
} }
static IterateValue* indexCacheIteratorGetValue(Iterate* iter); static IterateValue* idxCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) { IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache)); IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
if (cache == NULL) { if (cache == NULL) {
indexError("failed to create index cache"); indexError("failed to create index cache");
...@@ -352,13 +352,13 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -352,13 +352,13 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
taosThreadMutexInit(&cache->mtx, NULL); taosThreadMutexInit(&cache->mtx, NULL);
taosThreadCondInit(&cache->finished, NULL); taosThreadCondInit(&cache->finished, NULL);
indexCacheRef(cache); idxCacheRef(cache);
if (idx != NULL) { if (idx != NULL) {
indexAcquireRef(idx->refId); indexAcquireRef(idx->refId);
} }
return cache; return cache;
} }
void indexCacheDebug(IndexCache* cache) { void idxCacheDebug(IndexCache* cache) {
MemTable* tbl = NULL; MemTable* tbl = NULL;
taosThreadMutexLock(&cache->mtx); taosThreadMutexLock(&cache->mtx);
...@@ -405,7 +405,7 @@ void indexCacheDebug(IndexCache* cache) { ...@@ -405,7 +405,7 @@ void indexCacheDebug(IndexCache* cache) {
} }
} }
void indexCacheDestroySkiplist(SSkipList* slt) { void idxCacheDestroySkiplist(SSkipList* slt) {
SSkipListIterator* iter = tSkipListCreateIter(slt); SSkipListIterator* iter = tSkipListCreateIter(slt);
while (iter != NULL && tSkipListIterNext(iter)) { while (iter != NULL && tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
...@@ -418,11 +418,11 @@ void indexCacheDestroySkiplist(SSkipList* slt) { ...@@ -418,11 +418,11 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
tSkipListDestroy(slt); tSkipListDestroy(slt);
} }
void indexCacheBroadcast(void* cache) { void idxCacheBroadcast(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
taosThreadCondBroadcast(&pCache->finished); taosThreadCondBroadcast(&pCache->finished);
} }
void indexCacheWait(void* cache) { void idxCacheWait(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
taosThreadCondWait(&pCache->finished, &pCache->mtx); taosThreadCondWait(&pCache->finished, &pCache->mtx);
} }
...@@ -435,14 +435,14 @@ void idxCacheDestroyImm(IndexCache* cache) { ...@@ -435,14 +435,14 @@ void idxCacheDestroyImm(IndexCache* cache) {
tbl = cache->imm; tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread cache->imm = NULL; // or throw int bg thread
indexCacheBroadcast(cache); idxCacheBroadcast(cache);
taosThreadMutexUnlock(&cache->mtx); taosThreadMutexUnlock(&cache->mtx);
idxMemUnRef(tbl); idxMemUnRef(tbl);
idxMemUnRef(tbl); idxMemUnRef(tbl);
} }
void indexCacheDestroy(void* cache) { void idxCacheDestroy(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
if (pCache == NULL) { if (pCache == NULL) {
return; return;
...@@ -460,7 +460,7 @@ void indexCacheDestroy(void* cache) { ...@@ -460,7 +460,7 @@ void indexCacheDestroy(void* cache) {
taosMemoryFree(pCache); taosMemoryFree(pCache);
} }
Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* idxCacheIteratorCreate(IndexCache* cache) {
if (cache->imm == NULL) { if (cache->imm == NULL) {
return NULL; return NULL;
} }
...@@ -477,7 +477,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) { ...@@ -477,7 +477,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
iiter->val.colVal = NULL; iiter->val.colVal = NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL; iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = idxCacheIteratorNext; iiter->next = idxCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue; iiter->getValue = idxCacheIteratorGetValue;
taosThreadMutexUnlock(&cache->mtx); taosThreadMutexUnlock(&cache->mtx);
...@@ -492,9 +492,9 @@ void idxCacheIteratorDestroy(Iterate* iter) { ...@@ -492,9 +492,9 @@ void idxCacheIteratorDestroy(Iterate* iter) {
taosMemoryFree(iter); taosMemoryFree(iter);
} }
int indexCacheSchedToMerge(IndexCache* pCache, bool notify) { int idxCacheSchedToMerge(IndexCache* pCache, bool notify) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = doMergeWork; schedMsg.fp = idxDoMergeWork;
schedMsg.ahandle = pCache; schedMsg.ahandle = pCache;
if (notify) { if (notify) {
schedMsg.thandle = taosMemoryMalloc(1); schedMsg.thandle = taosMemoryMalloc(1);
...@@ -505,17 +505,17 @@ int indexCacheSchedToMerge(IndexCache* pCache, bool notify) { ...@@ -505,17 +505,17 @@ int indexCacheSchedToMerge(IndexCache* pCache, bool notify) {
return 0; return 0;
} }
static void indexCacheMakeRoomForWrite(IndexCache* cache) { static void idxCacheMakeRoomForWrite(IndexCache* cache) {
while (true) { while (true) {
if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) { if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
break; break;
} else if (cache->imm != NULL) { } else if (cache->imm != NULL) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
indexCacheWait(cache); idxCacheWait(cache);
} else { } else {
bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false; bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
indexCacheRef(cache); idxCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = idxInternalCacheCreate(cache->type); cache->mem = idxInternalCacheCreate(cache->type);
cache->mem->pCache = cache; cache->mem->pCache = cache;
...@@ -525,18 +525,18 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -525,18 +525,18 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
} }
// sched to merge // sched to merge
// unref cache in bgwork // unref cache in bgwork
indexCacheSchedToMerge(cache, quit); idxCacheSchedToMerge(cache, quit);
} }
} }
} }
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { if (cache == NULL) {
return -1; return -1;
} }
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache); idxCacheRef(pCache);
// encode data // encode data
CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
if (cache == NULL) { if (cache == NULL) {
...@@ -559,7 +559,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ...@@ -559,7 +559,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexLock(&pCache->mtx); taosThreadMutexLock(&pCache->mtx);
pCache->occupiedMem += estimate; pCache->occupiedMem += estimate;
indexCacheMakeRoomForWrite(pCache); idxCacheMakeRoomForWrite(pCache);
MemTable* tbl = pCache->mem; MemTable* tbl = pCache->mem;
idxMemRef(tbl); idxMemRef(tbl);
tSkipListPut(tbl->mem, (char*)ct); tSkipListPut(tbl->mem, (char*)ct);
...@@ -567,29 +567,29 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ...@@ -567,29 +567,29 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
indexCacheUnRef(pCache); idxCacheUnRef(pCache);
return 0; return 0;
// encode end // encode end
} }
void indexCacheForceToMerge(void* cache) { void idxCacheForceToMerge(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache); idxCacheRef(pCache);
taosThreadMutexLock(&pCache->mtx); taosThreadMutexLock(&pCache->mtx);
indexInfo("%p is forced to merge into tfile", pCache); indexInfo("%p is forced to merge into tfile", pCache);
pCache->occupiedMem += MEM_SIGNAL_QUIT; pCache->occupiedMem += MEM_SIGNAL_QUIT;
indexCacheMakeRoomForWrite(pCache); idxCacheMakeRoomForWrite(pCache);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
indexCacheUnRef(pCache); idxCacheUnRef(pCache);
return; return;
} }
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { int idxCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
return 0; return 0;
} }
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s) { static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s) {
if (mem == NULL) { if (mem == NULL) {
return 0; return 0;
} }
...@@ -603,7 +603,7 @@ static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* t ...@@ -603,7 +603,7 @@ static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* t
return cacheSearch[0][qtype](mem, term, tr, s); return cacheSearch[0][qtype](mem, term, tr, s);
} }
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) { int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
...@@ -618,10 +618,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe ...@@ -618,10 +618,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe
idxMemRef(imm); idxMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
int ret = (mem && mem->mem) ? indexQueryMem(mem, query, result, s) : 0; int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0;
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
ret = (imm && imm->mem) ? indexQueryMem(imm, query, result, s) : 0; ret = (imm && imm->mem) ? idxQueryMem(imm, query, result, s) : 0;
} }
idxMemUnRef(mem); idxMemUnRef(mem);
...@@ -631,20 +631,20 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe ...@@ -631,20 +631,20 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe
return ret; return ret;
} }
void indexCacheRef(IndexCache* cache) { void idxCacheRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) {
return; return;
} }
int ref = T_REF_INC(cache); int ref = T_REF_INC(cache);
UNUSED(ref); UNUSED(ref);
} }
void indexCacheUnRef(IndexCache* cache) { void idxCacheUnRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) {
return; return;
} }
int ref = T_REF_DEC(cache); int ref = T_REF_DEC(cache);
if (ref == 0) { if (ref == 0) {
indexCacheDestroy(cache); idxCacheDestroy(cache);
} }
} }
...@@ -662,7 +662,7 @@ void idxMemUnRef(MemTable* tbl) { ...@@ -662,7 +662,7 @@ void idxMemUnRef(MemTable* tbl) {
int ref = T_REF_DEC(tbl); int ref = T_REF_DEC(tbl);
if (ref == 0) { if (ref == 0) {
SSkipList* slt = tbl->mem; SSkipList* slt = tbl->mem;
indexCacheDestroySkiplist(slt); idxCacheDestroySkiplist(slt);
taosMemoryFree(tbl); taosMemoryFree(tbl);
} }
} }
...@@ -693,15 +693,15 @@ static int32_t idxCacheTermCompare(const void* l, const void* r) { ...@@ -693,15 +693,15 @@ static int32_t idxCacheTermCompare(const void* l, const void* r) {
return cmp; return cmp;
} }
static int indexFindCh(char* a, char c) { static int idxFindCh(char* a, char c) {
char* p = a; char* p = a;
while (*p != 0 && *p++ != c) { while (*p != 0 && *p++ != c) {
} }
return p - a; return p - a;
} }
static int idxCacheJsonTermCompareImpl(char* a, char* b) { static int idxCacheJsonTermCompareImpl(char* a, char* b) {
// int alen = indexFindCh(a, '&'); // int alen = idxFindCh(a, '&');
// int blen = indexFindCh(b, '&'); // int blen = idxFindCh(b, '&');
// int cmp = strncmp(a, b, MIN(alen, blen)); // int cmp = strncmp(a, b, MIN(alen, blen));
// if (cmp == 0) { // if (cmp == 0) {
...@@ -742,7 +742,7 @@ static MemTable* idxInternalCacheCreate(int8_t type) { ...@@ -742,7 +742,7 @@ static MemTable* idxInternalCacheCreate(int8_t type) {
return tbl; return tbl;
} }
static void doMergeWork(SSchedMsg* msg) { static void idxDoMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle; IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index; SIndex* sidx = (SIndex*)pCache->index;
...@@ -771,7 +771,7 @@ static bool idxCacheIteratorNext(Iterate* itera) { ...@@ -771,7 +771,7 @@ static bool idxCacheIteratorNext(Iterate* itera) {
return next; return next;
} }
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { static IterateValue* idxCacheIteratorGetValue(Iterate* iter) {
// opt later // opt later
return &iter->val; return &iter->val;
} }
...@@ -118,7 +118,7 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -118,7 +118,7 @@ TFileCache* tfileCacheCreate(const char* path) {
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)}; ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = idxSerialCacheKey(&key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader); tfileReaderRef(reader);
...@@ -149,7 +149,7 @@ void tfileCacheDestroy(TFileCache* tcache) { ...@@ -149,7 +149,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = idxSerialCacheKey(key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL || *reader == NULL) { if (reader == NULL || *reader == NULL) {
...@@ -161,7 +161,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { ...@@ -161,7 +161,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
} }
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = idxSerialCacheKey(key, buf);
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL && *p != NULL) { if (p != NULL && *p != NULL) {
...@@ -620,7 +620,7 @@ void tfileWriterDestroy(TFileWriter* tw) { ...@@ -620,7 +620,7 @@ void tfileWriterDestroy(TFileWriter* tw) {
taosMemoryFree(tw); taosMemoryFree(tw);
} }
IndexTFile* indexTFileCreate(const char* path) { IndexTFile* idxTFileCreate(const char* path) {
TFileCache* cache = tfileCacheCreate(path); TFileCache* cache = tfileCacheCreate(path);
if (cache == NULL) { if (cache == NULL) {
return NULL; return NULL;
...@@ -635,7 +635,7 @@ IndexTFile* indexTFileCreate(const char* path) { ...@@ -635,7 +635,7 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile->cache = cache; tfile->cache = cache;
return tfile; return tfile;
} }
void indexTFileDestroy(IndexTFile* tfile) { void idxTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { if (tfile == NULL) {
return; return;
} }
...@@ -644,7 +644,7 @@ void indexTFileDestroy(IndexTFile* tfile) { ...@@ -644,7 +644,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
taosMemoryFree(tfile); taosMemoryFree(tfile);
} }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) { int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { if (tfile == NULL) {
return ret; return ret;
...@@ -667,7 +667,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) { ...@@ -667,7 +667,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
return tfileReaderSearch(reader, query, result); return tfileReaderSearch(reader, query, result);
} }
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
// term->nColName, .version = 1}; // term->nColName, .version = 1};
......
...@@ -521,10 +521,10 @@ class CacheObj { ...@@ -521,10 +521,10 @@ class CacheObj {
public: public:
CacheObj() { CacheObj() {
// TODO // TODO
cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY); cache = idxCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY);
} }
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
int ret = indexCachePut(cache, term, uid); int ret = idxCachePut(cache, term, uid);
if (ret != 0) { if (ret != 0) {
// //
std::cout << "failed to put into cache: " << ret << std::endl; std::cout << "failed to put into cache: " << ret << std::endl;
...@@ -533,12 +533,12 @@ class CacheObj { ...@@ -533,12 +533,12 @@ class CacheObj {
} }
void Debug() { void Debug() {
// //
indexCacheDebug(cache); idxCacheDebug(cache);
} }
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
SIdxTRslt* tr = idxTRsltCreate(); SIdxTRslt* tr = idxTRsltCreate();
int ret = indexCacheSearch(cache, query, tr, s); int ret = idxCacheSearch(cache, query, tr, s);
idxTRsltMergeTo(tr, result); idxTRsltMergeTo(tr, result);
idxTRsltDestroy(tr); idxTRsltDestroy(tr);
...@@ -549,7 +549,7 @@ class CacheObj { ...@@ -549,7 +549,7 @@ class CacheObj {
} }
~CacheObj() { ~CacheObj() {
// TODO // TODO
indexCacheDestroy(cache); idxCacheDestroy(cache);
} }
private: private:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册