未验证 提交 07eb2516 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9464 from taosdata/feature/index_cache

handle cache write/reade concurrent problem
......@@ -30,14 +30,18 @@
extern "C" {
#endif
typedef struct MemTable {
T_REF_DECLARE()
SSkipList* mem;
} MemTable;
typedef struct IndexCache {
T_REF_DECLARE()
SSkipList *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int32_t nTerm;
int8_t type;
MemTable *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int32_t nTerm;
int8_t type;
pthread_mutex_t mtx;
} IndexCache;
......@@ -45,7 +49,6 @@ typedef struct IndexCache {
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
typedef struct CacheTerm {
// key
int32_t nColVal;
char* colVal;
int32_t version;
// value
......
......@@ -34,9 +34,7 @@ int32_t indexInit() {
return indexQhandle == NULL ? -1 : 0;
// do nothing
}
void indexCleanUp() {
taosCleanUpScheduler(indexQhandle);
}
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
static int uidCompare(const void* a, const void* b) {
uint64_t u1 = *(uint64_t*)a;
......@@ -63,7 +61,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { return -1; }
if (sIdx == NULL) {
return -1;
}
#ifdef USE_LUCENE
index_t* index = index_open(path);
......@@ -99,7 +99,9 @@ void indexClose(SIndex* sIdx) {
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
if (*pCache) { indexCacheUnRef(*pCache); }
if (*pCache) {
indexCacheUnRef(*pCache);
}
iter = taosHashIterate(sIdx->colObj, iter);
}
taosHashCleanup(sIdx->colObj);
......@@ -133,7 +135,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
if (*cache == NULL) {
if (cache == NULL) {
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
}
......@@ -143,10 +145,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid);
if (ret != 0) { return ret; }
if (ret != 0) {
return ret;
}
}
#endif
......@@ -224,17 +227,20 @@ SIndexOpts* indexOptsCreate() {
#endif
return NULL;
}
void indexOptsDestroy(SIndexOpts* opts){
void indexOptsDestroy(SIndexOpts* opts) {
#ifdef USE_LUCENE
#endif
} /*
* @param: oper
*
*/
return;
}
/*
* @param: oper
*
*/
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { return NULL; }
if (p == NULL) {
return NULL;
}
p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p;
......@@ -253,15 +259,12 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
return 0;
}
SIndexTerm* indexTermCreate(int64_t suid,
SIndexOperOnColumn oper,
uint8_t colType,
const char* colName,
int32_t nColName,
const char* colVal,
int32_t nColVal) {
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { return NULL; }
if (t == NULL) {
return NULL;
}
t->suid = suid;
t->operType = oper;
......@@ -282,9 +285,7 @@ void indexTermDestroy(SIndexTerm* p) {
free(p);
}
SIndexMultiTerm* indexMultiTermCreate() {
return taosArrayInit(4, sizeof(SIndexTerm*));
}
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
taosArrayPush(terms, &term);
......@@ -307,7 +308,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
if (*pCache == NULL) {
if (pCache == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
......@@ -335,7 +336,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return 0;
}
static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) { return; }
if (results == NULL) {
return;
}
size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) {
......@@ -366,7 +369,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; }
if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache;
......@@ -399,7 +404,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val);
taosArrayPush(result, &tfv);
// copy to final Result;
cn = cacheIter->next(cacheIter);
} else {
......@@ -433,7 +437,9 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
indexError("faile to open file to write");
} else {
int ret = tfileWriterPut(tw, result);
if (ret != 0) { indexError("faile to write into tindex "); }
if (ret != 0) {
indexError("faile to write into tindex ");
}
}
// not free later, just put int table cache
indexCacheDestroyImm(pCache);
......
......@@ -23,46 +23,22 @@
#define MEM_TERM_LIMIT 1000000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
// sizeof(p->operType))
static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) { return; }
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
free(ct->colVal);
free(ct);
}
static char* getIndexKey(const void* pData) {
CacheTerm* p = (CacheTerm*)pData;
return (char*)p;
}
static void cacheTermDestroy(CacheTerm* ct);
static char* getIndexKey(const void* pData);
static int32_t compareKey(const void* l, const void* r);
static int32_t compareKey(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
static MemTable* indexInternalCacheCreate(int8_t type);
// compare colVal
int i, j;
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
if (lt->colVal[i] == rt->colVal[j]) {
continue;
} else {
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
}
}
if (i < lt->nColVal) {
return 1;
} else if (j < rt->nColVal) {
return -1;
}
// compare version
return rt->version - lt->version;
}
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static SSkipList* indexInternalCacheCreate(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
}
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
IndexCache* cache = calloc(1, sizeof(IndexCache));
......@@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
return cache;
}
void indexCacheDebug(IndexCache* cache) {
SSkipListIterator* iter = tSkipListCreateIter(cache->mem);
MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx);
tbl = cache->mem;
indexMemRef(tbl);
pthread_mutex_unlock(&cache->mtx);
SSkipList* slt = tbl->mem;
SSkipListIterator* iter = tSkipListCreateIter(slt);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
......@@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) {
}
}
tSkipListDestroyIter(iter);
indexMemUnRef(tbl);
}
void indexCacheDestroySkiplist(SSkipList* slt) {
......@@ -100,71 +86,50 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {}
if (ct != NULL) {
}
}
tSkipListDestroyIter(iter);
tSkipListDestroy(slt);
}
void indexCacheDestroyImm(IndexCache* cache) {
MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx);
SSkipList* timm = (SSkipList*)cache->imm;
tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread
pthread_mutex_unlock(&cache->mtx);
indexCacheDestroySkiplist(timm);
indexMemUnRef(tbl);
}
void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache;
if (pCache == NULL) { return; }
tSkipListDestroy(pCache->mem);
tSkipListDestroy(pCache->imm);
if (pCache == NULL) {
return;
}
indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm);
free(pCache->colName);
free(pCache);
}
static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; }
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->colVal = ct->colVal;
taosArrayPush(iv->val, &ct->uid);
}
return next;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
return &iter->val;
}
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
Iterate* iiter = calloc(1, sizeof(Iterate));
if (iiter == NULL) { return NULL; }
if (iiter == NULL) {
return NULL;
}
MemTable* tbl = cache->imm;
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
if (iter == NULL) { return; }
if (iter == NULL) {
return;
}
tSkipListDestroyIter(iter->iter);
iterateValueDestroy(&iter->val, true);
free(iter);
......@@ -201,18 +166,21 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { return -1; }
if (cache == NULL) {
return -1;
}
IndexCache* pCache = cache;
indexCacheRef(pCache);
// encode data
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (cache == NULL) { return -1; }
if (cache == NULL) {
return -1;
}
// set up key
ct->colType = term->colType;
ct->nColVal = term->nColVal;
ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1));
memcpy(ct->colVal, term->colVal, ct->nColVal);
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal);
ct->version = atomic_add_fetch_32(&pCache->version, 1);
// set value
ct->uid = uid;
......@@ -220,8 +188,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// ugly code, refactor later
pthread_mutex_lock(&pCache->mtx);
indexCacheMakeRoomForWrite(pCache);
tSkipListPut(pCache->mem, (char*)ct);
MemTable* tbl = pCache->mem;
indexMemRef(tbl);
tSkipListPut(tbl->mem, (char*)ct);
indexMemUnRef(tbl);
pthread_mutex_unlock(&pCache->mtx);
indexCacheUnRef(pCache);
......@@ -233,27 +206,38 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { return -1; }
if (cache == NULL) {
return -1;
}
IndexCache* pCache = cache;
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
MemTable *mem = NULL, *imm = NULL;
pthread_mutex_lock(&pCache->mtx);
mem = pCache->mem;
imm = pCache->imm;
indexMemRef(mem);
indexMemRef(imm);
pthread_mutex_unlock(&pCache->mtx);
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (ct == NULL) { return -1; }
ct->nColVal = term->nColVal;
ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1));
memcpy(ct->colVal, term->colVal, ct->nColVal);
if (ct == NULL) {
return -1;
}
ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal);
ct->version = atomic_load_32(&pCache->version);
char* key = getIndexKey(ct);
// TODO handle multi situation later, and refactor
SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) {
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) {
if (strcmp(c->colVal, ct->colVal) == 0) {
taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else {
......@@ -279,14 +263,104 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
} else if (qtype == QUERY_REGEX) {
//
}
indexMemUnRef(mem);
indexMemUnRef(imm);
return 0;
}
void indexCacheRef(IndexCache* cache) {
if (cache == NULL) {
return;
}
int ref = T_REF_INC(cache);
UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
if (cache == NULL) {
return;
}
int ref = T_REF_DEC(cache);
if (ref == 0) { indexCacheDestroy(cache); }
if (ref == 0) {
indexCacheDestroy(cache);
}
}
void indexMemRef(MemTable* tbl) {
if (tbl == NULL) {
return;
}
int ref = T_REF_INC(tbl);
UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
if (tbl == NULL) {
return;
}
int ref = T_REF_DEC(tbl);
if (ref == 0) {
SSkipList* slt = tbl->mem;
indexCacheDestroySkiplist(slt);
free(tbl);
}
}
static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) {
return;
}
free(ct->colVal);
free(ct);
}
static char* getIndexKey(const void* pData) {
CacheTerm* p = (CacheTerm*)pData;
return (char*)p;
}
static int32_t compareKey(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
// compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) {
return rt->version - lt->version;
}
return cmp;
}
static MemTable* indexInternalCacheCreate(int8_t type) {
MemTable* tbl = calloc(1, sizeof(MemTable));
indexMemRef(tbl);
if (type == TSDB_DATA_TYPE_BINARY) {
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
}
return tbl;
}
static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) {
return false;
}
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->colVal = ct->colVal;
taosArrayPush(iv->val, &ct->uid);
}
return next;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }
......@@ -54,7 +54,9 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache));
if (tcache == NULL) { return NULL; }
if (tcache == NULL) {
return NULL;
}
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64;
......@@ -83,7 +85,10 @@ TFileCache* tfileCacheCreate(const char* path) {
tfileReaderRef(reader);
// loader fst and validate it
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
TFileCacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
char buf[128] = {0};
tfileSerialCacheKey(&key, buf);
......@@ -97,13 +102,16 @@ End:
return NULL;
}
void tfileCacheDestroy(TFileCache* tcache) {
if (tcache == NULL) { return; }
if (tcache == NULL) {
return;
}
// free table cache
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
while (reader) {
TFileReader* p = *reader;
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
p->header.colType);
tfileReaderUnRef(p);
reader = taosHashIterate(tcache->tableCache, reader);
......@@ -116,10 +124,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
tfileReaderRef(reader);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
if (reader == NULL) {
return NULL;
}
tfileReaderRef(*reader);
return reader;
return *reader;
}
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
char buf[128] = {0};
......@@ -138,14 +149,17 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
}
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* reader = calloc(1, sizeof(TFileReader));
if (reader == NULL) { return NULL; }
if (reader == NULL) {
return NULL;
}
// T_REF_INC(reader);
reader->ctx = ctx;
if (0 != tfileReaderLoadHeader(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
reader->header.colName);
return NULL;
}
......@@ -158,7 +172,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
return reader;
}
void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) { return; }
if (reader == NULL) {
return;
}
// T_REF_INC(reader);
fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx);
......@@ -175,10 +191,12 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
uint64_t offset;
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
if (fstGet(reader->fst, &key, &offset)) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
term->colVal);
ret = tfileReaderLoadTableIds(reader, offset, result);
} else {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
term->colVal);
}
fstSliceDestroy(&key);
} else if (qtype == QUERY_PREFIX) {
......@@ -304,12 +322,16 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
return 0;
}
void tfileWriteClose(TFileWriter* tw) {
if (tw == NULL) { return; }
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx);
free(tw);
}
void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) { return; }
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx);
free(tw);
......@@ -317,29 +339,35 @@ void tfileWriterDestroy(TFileWriter* tw) {
IndexTFile* indexTFileCreate(const char* path) {
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { return NULL; }
if (tfile == NULL) {
return NULL;
}
tfile->cache = tfileCacheCreate(path);
return tfile;
}
void IndexTFileDestroy(IndexTFile* tfile) {
free(tfile);
}
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1;
if (tfile == NULL) { return ret; }
if (tfile == NULL) {
return ret;
}
IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term;
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
TFileCacheKey key = {
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
if (reader == NULL) {
return 0;
}
return tfileReaderSearch(reader, query, result);
}
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
// 1};
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
// term->nColName, .version = 1};
return 0;
}
......@@ -353,7 +381,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
TFileFstIter* tIter = iiter->iter;
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
if (rt == NULL) { return false; }
if (rt == NULL) {
return false;
}
int32_t sz = 0;
char* ch = (char*)fstSliceData(&rt->data, &sz);
......@@ -364,20 +394,22 @@ static bool tfileIteratorNext(Iterate* iiter) {
swsResultDestroy(rt);
// set up iterate value
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
return false;
}
iv->colVal = colVal;
// std::string key(ch, sz);
}
static IterateValue* tifileIterateGetValue(Iterate* iter) {
return &iter->val;
}
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
if (tIter == NULL) { return NULL; }
if (tIter == NULL) {
return NULL;
}
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
tIter->fb = fstSearch(reader->fst, tIter->ctx);
tIter->st = streamBuilderIntoStream(tIter->fb);
......@@ -389,14 +421,18 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
Iterate* iter = calloc(1, sizeof(Iterate));
iter->iter = tfileFstIteratorCreate(reader);
if (iter->iter == NULL) { return NULL; }
if (iter->iter == NULL) {
return NULL;
}
iter->next = tfileIteratorNext;
iter->getValue = tifileIterateGetValue;
return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
if (iter == NULL) { return; }
if (iter == NULL) {
return;
}
IterateValue* iv = &iter->val;
iterateValueDestroy(iv, true);
......@@ -409,14 +445,18 @@ void tfileIteratorDestroy(Iterate* iter) {
}
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
if (tf == NULL) { return NULL; }
if (tf == NULL) {
return NULL;
}
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key);
}
static int tfileStrCompare(const void* a, const void* b) {
int ret = strcmp((char*)a, (char*)b);
if (ret == 0) { return ret; }
if (ret == 0) {
return ret;
}
return ret < 0 ? -1 : 1;
}
......@@ -431,13 +471,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { return NULL; }
if (tf == NULL) {
return NULL;
}
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
if (tf == NULL) { return -1; }
if (tf == NULL) {
return -1;
}
taosArrayPush(tf->tableId, &val);
return 0;
}
......@@ -457,7 +501,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
return -1;
}
tw->offset += sizeof(fstOffset);
return 0;
}
......@@ -468,7 +514,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { return -1; }
if (sizeof(buf) != nwrite) {
return -1;
}
writer->offset = nwrite;
return 0;
}
......@@ -502,7 +550,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static int FST_MAX_SIZE = 16 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { return -1; }
if (buf == NULL) {
return -1;
}
WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
......@@ -525,7 +575,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
int32_t total = sizeof(uint64_t) * nid;
char* buf = calloc(1, total);
if (buf == NULL) { return -1; }
if (buf == NULL) {
return -1;
}
nread = ctx->read(ctx, buf, total);
assert(total == nread);
......@@ -543,12 +595,16 @@ void tfileReaderRef(TFileReader* reader) {
void tfileReaderUnRef(TFileReader* reader) {
int ref = T_REF_DEC(reader);
if (ref == 0) { tfileReaderDestroy(reader); }
if (ref == 0) {
tfileReaderDestroy(reader);
}
}
static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path);
if (NULL == dir) { return -1; }
if (NULL == dir) {
return -1;
}
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
......@@ -576,7 +632,9 @@ static int tfileCompare(const void* a, const void* b) {
size_t bLen = strlen(bName);
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
if (ret == 0) { return ret; }
if (ret == 0) {
return ret;
}
return ret < 0 ? -1 : 1;
}
// tfile name suid-colId-version.tindex
......
......@@ -2,7 +2,8 @@
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation.
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
......@@ -75,7 +76,9 @@ class FstReadMemory {
bool init() {
char* buf = (char*)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { return false; }
if (nRead <= 0) {
return false;
}
_size = nRead;
_s = fstSliceCreate((uint8_t*)buf, _size);
_fst = fstCreate(&_s);
......@@ -179,7 +182,9 @@ void checkFstPerf() {
delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init()) { printf("success to init fst read"); }
if (m->init()) {
printf("success to init fst read");
}
Performance_fstReadRecords(m);
delete m;
}
......@@ -283,7 +288,8 @@ class IndexEnv : public ::testing::Test {
// / {
// / std::string colName("tag1"), colVal("Hello world");
// / SIndexTerm* term =
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / colVal.size());
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), /
// colVal.size());
// SIndexMultiTerm* terms = indexMultiTermCreate();
// indexMultiTermAdd(terms, term);
// / / for (size_t i = 0; i < 100; i++) {
......@@ -301,14 +307,16 @@ class IndexEnv : public ::testing::Test {
// / {
// / std::string colName("tag1"), colVal("Hello world");
// / SIndexTerm* term =
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
// colVal.size());
// / indexMultiTermAdd(terms, term);
// /
// }
// / {
// / std::string colName("tag2"), colVal("Hello world");
// / SIndexTerm* term =
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
// colVal.size());
// / indexMultiTermAdd(terms, term);
// /
// }
......@@ -327,7 +335,8 @@ class IndexEnv : public ::testing::Test {
class TFileObj {
public:
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) {
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
: path_(path), colName_(colName) {
colId_ = 10;
// Do Nothing
//
......@@ -337,7 +346,9 @@ class TFileObj {
tfileReaderDestroy(reader_);
reader_ = NULL;
}
if (writer_ == NULL) { InitWriter(); }
if (writer_ == NULL) {
InitWriter();
}
return tfileWriterPut(writer_, tv);
}
bool InitWriter() {
......@@ -377,8 +388,12 @@ class TFileObj {
return tfileReaderSearch(reader_, query, result);
}
~TFileObj() {
if (writer_) { tfileWriterDestroy(writer_); }
if (reader_) { tfileReaderDestroy(reader_); }
if (writer_) {
tfileWriterDestroy(writer_);
}
if (reader_) {
tfileReaderDestroy(reader_);
}
}
private:
......@@ -455,9 +470,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
}
taosArrayDestroy(data);
std::string colName("voltage");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
std::string colName("voltage");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
......@@ -525,54 +541,62 @@ TEST_F(IndexCacheEnv, cache_test) {
std::string colName("voltage");
{
std::string colVal("v1");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
{
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
}
{
std::string colVal("v4");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
}
{
std::string colVal("v4");
for (size_t i = 0; i < 10; i++) {
colVal[colVal.size() - 1] = 'a' + i;
SIndexTerm* term =
indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
}
}
coj->Debug();
// begin query
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
STermValueType valType;
......@@ -582,8 +606,9 @@ TEST_F(IndexCacheEnv, cache_test) {
assert(taosArrayGetSize(ret) == 4);
}
{
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
STermValueType valType;
......@@ -592,3 +617,132 @@ TEST_F(IndexCacheEnv, cache_test) {
assert(taosArrayGetSize(ret) == 1);
}
}
class IndexObj {
public:
IndexObj() {
// opt
numOfWrite = 0;
numOfRead = 0;
indexInit();
}
int Init(const std::string& dir) {
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
// opt
std::cout << "failed to open index: %s" << dir << std::endl;
}
return ret;
}
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
numOfWrite += taosArrayGetSize(fvs);
return indexPut(idx, fvs, uid);
}
int Search(SIndexMultiTermQuery* multiQ, SArray* result) {
SArray* query = multiQ->query;
numOfRead = taosArrayGetSize(query);
return indexSearch(idx, multiQ, result);
}
void Debug() {
std::cout << "numOfWrite:" << numOfWrite << std::endl;
std::cout << "numOfRead:" << numOfRead << std::endl;
}
~IndexObj() {
indexClose(idx);
indexCleanUp();
}
private:
SIndexOpts opts;
SIndex* idx;
int numOfWrite;
int numOfRead;
};
class IndexEnv2 : public ::testing::Test {
protected:
virtual void SetUp() {
index = new IndexObj();
//
}
virtual void TearDown() {
// r
delete index;
}
IndexObj* index;
};
TEST_F(IndexEnv2, testIndexOpen) {
std::string path = "/tmp";
if (index->Init(path) != 0) {
std::cout << "failed to init index" << std::endl;
exit(1);
}
int targetSize = 100;
{
std::string colName("tag1"), colVal("Hello world");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < targetSize; i++) {
int tableId = i;
int ret = index->Put(terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
{
size_t size = 100;
std::string colName("tag1"), colVal("hello world");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < size; i++) {
int tableId = i;
int ret = index->Put(terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("tag1"), colVal("Hello world");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
index->Search(mq, result);
assert(taosArrayGetSize(result) == targetSize);
}
}
TEST_F(IndexEnv2, testIndex_CachePut) {
std::string path = "/tmp";
if (index->Init(path) != 0) {
}
}
TEST_F(IndexEnv2, testIndexr_TFilePut) {
std::string path = "/tmp";
if (index->Init(path) != 0) {
}
}
TEST_F(IndexEnv2, testIndex_CacheSearch) {
std::string path = "/tmp";
if (index->Init(path) != 0) {
}
}
TEST_F(IndexEnv2, testIndex_TFileSearch) {
std::string path = "/tmp";
if (index->Init(path) != 0) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册