提交 d2c4a924 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

...@@ -37,6 +37,7 @@ typedef struct WriterCtx { ...@@ -37,6 +37,7 @@ typedef struct WriterCtx {
struct { struct {
int fd; int fd;
bool readOnly; bool readOnly;
char buf[256];
} file; } file;
struct { struct {
int32_t capa; int32_t capa;
...@@ -53,7 +54,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off ...@@ -53,7 +54,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
static int writeCtxDoFlush(WriterCtx* ctx); static int writeCtxDoFlush(WriterCtx* ctx);
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity); WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx* w); void writerCtxDestroy(WriterCtx* w, bool remove);
typedef uint32_t CheckSummer; typedef uint32_t CheckSummer;
......
...@@ -107,6 +107,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* ...@@ -107,6 +107,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader*
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader); void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result);
...@@ -114,10 +115,10 @@ void tfileReaderRef(TFileReader* reader); ...@@ -114,10 +115,10 @@ void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type); TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type);
void tfileWriteClose(TFileWriter* tw); void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw); void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data); int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw); int tfileWriterFinish(TFileWriter* tw);
// //
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#endif #endif
#define INDEX_NUM_OF_THREADS 4 #define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4 #define INDEX_QUEUE_SIZE 200
void* indexQhandle = NULL; void* indexQhandle = NULL;
...@@ -52,18 +52,17 @@ typedef struct SIdxColInfo { ...@@ -52,18 +52,17 @@ typedef struct SIdxColInfo {
static pthread_once_t isInit = PTHREAD_ONCE_INIT; static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit(); // static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static void indexInterResultsDestroy(SArray* results); static void indexInterResultsDestroy(SArray* results);
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult); static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// pthread_once(&isInit, indexInit); // pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { if (sIdx == NULL) { return -1; }
return -1;
}
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t* index = index_open(path); index_t* index = index_open(path);
...@@ -99,9 +98,7 @@ void indexClose(SIndex* sIdx) { ...@@ -99,9 +98,7 @@ void indexClose(SIndex* sIdx) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
if (*pCache) { if (*pCache) { indexCacheUnRef(*pCache); }
indexCacheUnRef(*pCache);
}
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
} }
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
...@@ -147,9 +144,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -147,9 +144,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
assert(*cache != NULL); assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid); int ret = indexCachePut(*cache, p, uid);
if (ret != 0) { if (ret != 0) { return ret; }
return ret;
}
} }
#endif #endif
...@@ -179,9 +174,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result ...@@ -179,9 +174,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int tsz = 0; int tsz = 0;
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz); index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
for (int i = 0; i < tsz; i++) { for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); }
taosArrayPush(result, &tResult[i]);
}
for (int i = 0; i < nQuery; i++) { for (int i = 0; i < nQuery; i++) {
free(fields[i]); free(fields[i]);
...@@ -238,9 +231,7 @@ void indexOptsDestroy(SIndexOpts* opts) { ...@@ -238,9 +231,7 @@ void indexOptsDestroy(SIndexOpts* opts) {
*/ */
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery)); SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { if (p == NULL) { return NULL; }
return NULL;
}
p->opera = opera; p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p; return p;
...@@ -262,9 +253,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde ...@@ -262,9 +253,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) { int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm))); SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { if (t == NULL) { return NULL; }
return NULL;
}
t->suid = suid; t->suid = suid;
t->operType = oper; t->operType = oper;
...@@ -317,7 +306,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -317,7 +306,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
*result = taosArrayInit(4, sizeof(uint64_t)); *result = taosArrayInit(4, sizeof(uint64_t));
// TODO: iterator mem and tidex // TODO: iterator mem and tidex
STermValueType s; STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) { if (0 == indexCacheSearch(cache, query, *result, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName); indexInfo("col: %s already drop by other opera", term->colName);
...@@ -336,9 +325,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -336,9 +325,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return 0; return 0;
} }
static void indexInterResultsDestroy(SArray* results) { static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) { if (results == NULL) { return; }
return;
}
size_t sz = taosArrayGetSize(results); size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
...@@ -368,23 +355,43 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType ...@@ -368,23 +355,43 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0; return 0;
} }
int indexFlushCacheTFile(SIndex* sIdx, void* cache) { static void indexMergeSameKey(SArray* result, TFileValue* tv) {
if (sIdx == NULL) { int32_t sz = result ? taosArrayGetSize(result) : 0;
return -1; if (sz > 0) {
// TODO(yihao): remove duplicate tableid
TFileValue* lv = taosArrayGetP(result, sz - 1);
if (strcmp(lv->colVal, tv->colVal) == 0) {
taosArrayAddAll(lv->tableId, tv->tableId);
tfileValueDestroy(tv);
} else {
taosArrayPush(result, &tv);
}
} else {
taosArrayPush(result, &tv);
} }
}
static void indexDestroyTempResult(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
for (size_t i = 0; i < sz; i++) {
TFileValue* tv = taosArrayGetP(result, i);
tfileValueDestroy(tv);
}
taosArrayDestroy(result);
}
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
// handle flush // handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader); Iterate* tfileIter = tfileIteratorCreate(pReader);
SArray* result = taosArrayInit(1024, sizeof(void*)); SArray* result = taosArrayInit(1024, sizeof(void*));
bool cn = cacheIter->next(cacheIter); bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter->next(tfileIter); bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
while (cn == true && tn == true) { while (cn == true && tn == true) {
IterateValue* cv = cacheIter->getValue(cacheIter); IterateValue* cv = cacheIter->getValue(cacheIter);
IterateValue* tv = tfileIter->getValue(tfileIter); IterateValue* tv = tfileIter->getValue(tfileIter);
...@@ -395,7 +402,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { ...@@ -395,7 +402,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
TFileValue* tfv = tfileValueCreate(cv->colVal); TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val); taosArrayAddAll(tfv->tableId, cv->val);
taosArrayAddAll(tfv->tableId, tv->val); taosArrayAddAll(tfv->tableId, tv->val);
taosArrayPush(result, &tfv); indexMergeSameKey(result, tfv);
cn = cacheIter->next(cacheIter); cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
...@@ -403,13 +410,15 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { ...@@ -403,13 +410,15 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
} else if (comp < 0) { } else if (comp < 0) {
TFileValue* tfv = tfileValueCreate(cv->colVal); TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val); taosArrayAddAll(tfv->tableId, cv->val);
taosArrayPush(result, &tfv);
indexMergeSameKey(result, tfv);
// copy to final Result; // copy to final Result;
cn = cacheIter->next(cacheIter); cn = cacheIter->next(cacheIter);
} else { } else {
TFileValue* tfv = tfileValueCreate(tv->colVal); TFileValue* tfv = tfileValueCreate(tv->colVal);
taosArrayPush(result, &tfv);
taosArrayAddAll(tfv->tableId, tv->val); taosArrayAddAll(tfv->tableId, tv->val);
indexMergeSameKey(result, tfv);
// copy to final result // copy to final result
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
} }
...@@ -418,33 +427,24 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { ...@@ -418,33 +427,24 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
IterateValue* cv = cacheIter->getValue(cacheIter); IterateValue* cv = cacheIter->getValue(cacheIter);
TFileValue* tfv = tfileValueCreate(cv->colVal); TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val); taosArrayAddAll(tfv->tableId, cv->val);
taosArrayPush(result, &tfv); indexMergeSameKey(result, tfv);
cn = cacheIter->next(cacheIter); cn = cacheIter->next(cacheIter);
} }
while (tn == true) { while (tn == true) {
IterateValue* tv = tfileIter->getValue(tfileIter); IterateValue* tv = tfileIter->getValue(tfileIter);
TFileValue* tfv = tfileValueCreate(tv->colVal); TFileValue* tfv = tfileValueCreate(tv->colVal);
if (tv->val == NULL) {
// HO
printf("NO....");
}
taosArrayAddAll(tfv->tableId, tv->val); taosArrayAddAll(tfv->tableId, tv->val);
taosArrayPush(result, &tfv); indexMergeSameKey(result, tfv);
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
} }
int ret = indexGenTFile(sIdx, pCache, result);
int32_t version = CACHE_VERSION(pCache); indexDestroyTempResult(result);
uint8_t colType = pCache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType);
if (tw == NULL) {
indexError("faile to open file to write");
} else {
int ret = tfileWriterPut(tw, result);
if (ret != 0) {
indexError("faile to write into tindex ");
}
}
// not free later, just put int table cache
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
tfileWriteClose(tw);
indexCacheIteratorDestroy(cacheIter); indexCacheIteratorDestroy(cacheIter);
tfileIteratorDestroy(tfileIter); tfileIteratorDestroy(tfileIter);
...@@ -455,9 +455,45 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { ...@@ -455,9 +455,45 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
void iterateValueDestroy(IterateValue* value, bool destroy) { void iterateValueDestroy(IterateValue* value, bool destroy) {
if (destroy) { if (destroy) {
taosArrayDestroy(value->val); taosArrayDestroy(value->val);
value->val = NULL;
} else { } else {
taosArrayClear(value->val); if (value->val != NULL) { taosArrayClear(value->val); }
} }
free(value->colVal); // free(value->colVal);
value->colVal = NULL; value->colVal = NULL;
} }
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t version = CACHE_VERSION(cache);
uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType);
if (tw == NULL) {
indexError("failed to open file to write");
return -1;
}
int ret = tfileWriterPut(tw, batch, true);
if (ret != 0) {
indexError("failed to write into tindex ");
goto END;
}
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName);
char buf[128] = {0};
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
tfileCachePut(ifile->cache, &key, reader);
pthread_mutex_unlock(&sIdx->mtx);
return ret;
END:
tfileWriterClose(tw);
}
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 1000000 #define MEM_TERM_LIMIT 100
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
...@@ -86,25 +86,26 @@ void indexCacheDestroySkiplist(SSkipList* slt) { ...@@ -86,25 +86,26 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) { if (ct != NULL) {}
}
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
tSkipListDestroy(slt); tSkipListDestroy(slt);
} }
void indexCacheDestroyImm(IndexCache* cache) { void indexCacheDestroyImm(IndexCache* cache) {
if (cache == NULL) { return; }
MemTable* tbl = NULL; MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx); pthread_mutex_lock(&cache->mtx);
tbl = cache->imm; tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread cache->imm = NULL; // or throw int bg thread
pthread_mutex_unlock(&cache->mtx); pthread_mutex_unlock(&cache->mtx);
indexMemUnRef(tbl);
indexMemUnRef(tbl); indexMemUnRef(tbl);
} }
void indexCacheDestroy(void* cache) { void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
if (pCache == NULL) { if (pCache == NULL) { return; }
return;
}
indexMemUnRef(pCache->mem); indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm); indexMemUnRef(pCache->imm);
free(pCache->colName); free(pCache->colName);
...@@ -114,9 +115,11 @@ void indexCacheDestroy(void* cache) { ...@@ -114,9 +115,11 @@ void indexCacheDestroy(void* cache) {
Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* indexCacheIteratorCreate(IndexCache* cache) {
Iterate* iiter = calloc(1, sizeof(Iterate)); Iterate* iiter = calloc(1, sizeof(Iterate));
if (iiter == NULL) { if (iiter == NULL) { return NULL; }
return NULL;
} pthread_mutex_lock(&cache->mtx);
indexMemRef(cache->imm);
MemTable* tbl = cache->imm; MemTable* tbl = cache->imm;
iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
...@@ -124,12 +127,12 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) { ...@@ -124,12 +127,12 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
iiter->next = indexCacheIteratorNext; iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue; iiter->getValue = indexCacheIteratorGetValue;
pthread_mutex_unlock(&cache->mtx);
return iiter; return iiter;
} }
void indexCacheIteratorDestroy(Iterate* iter) { void indexCacheIteratorDestroy(Iterate* iter) {
if (iter == NULL) { if (iter == NULL) { return; }
return;
}
tSkipListDestroyIter(iter->iter); tSkipListDestroyIter(iter->iter);
iterateValueDestroy(&iter->val, true); iterateValueDestroy(&iter->val, true);
free(iter); free(iter);
...@@ -155,6 +158,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -155,6 +158,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
taosMsleep(50); taosMsleep(50);
pthread_mutex_lock(&cache->mtx); pthread_mutex_lock(&cache->mtx);
} else { } else {
indexCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type); cache->mem = indexInternalCacheCreate(cache->type);
cache->nTerm = 1; cache->nTerm = 1;
...@@ -166,17 +170,13 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -166,17 +170,13 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
} }
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { if (cache == NULL) { return -1; }
return -1;
}
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache); indexCacheRef(pCache);
// encode data // encode data
CacheTerm* ct = calloc(1, sizeof(CacheTerm)); CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (cache == NULL) { if (cache == NULL) { return -1; }
return -1;
}
// set up key // set up key
ct->colType = term->colType; ct->colType = term->colType;
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
...@@ -205,32 +205,11 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u ...@@ -205,32 +205,11 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
IndexCache* pCache = cache; IndexCache* pCache = cache;
return 0; return 0;
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) {
return -1;
}
IndexCache* pCache = cache;
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
MemTable *mem = NULL, *imm = NULL;
pthread_mutex_lock(&pCache->mtx);
mem = pCache->mem;
imm = pCache->imm;
indexMemRef(mem);
indexMemRef(imm);
pthread_mutex_unlock(&pCache->mtx);
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (ct == NULL) {
return -1;
}
ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal);
ct->version = atomic_load_32(&pCache->version);
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
if (mem == NULL) { return 0; }
char* key = getIndexKey(ct); char* key = getIndexKey(ct);
// TODO handle multi situation later, and refactor
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
...@@ -251,51 +230,55 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV ...@@ -251,51 +230,55 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
cacheTermDestroy(ct); return 0;
// int32_t keyLen = CACHE_KEY_LEN(term); }
// char* buf = calloc(1, keyLen); int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (qtype == QUERY_TERM) { if (cache == NULL) { return -1; }
// IndexCache* pCache = cache;
} else if (qtype == QUERY_PREFIX) {
// MemTable *mem = NULL, *imm = NULL;
} else if (qtype == QUERY_SUFFIX) { pthread_mutex_lock(&pCache->mtx);
// mem = pCache->mem;
} else if (qtype == QUERY_REGEX) { imm = pCache->imm;
// indexMemRef(mem);
indexMemRef(imm);
pthread_mutex_unlock(&pCache->mtx);
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
int ret = indexQueryMem(mem, &ct, qtype, result, s);
if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm
ret = indexQueryMem(imm, &ct, qtype, result, s);
} }
// cacheTermDestroy(ct);
indexMemUnRef(mem); indexMemUnRef(mem);
indexMemUnRef(imm); indexMemUnRef(imm);
return 0;
return ret;
} }
void indexCacheRef(IndexCache* cache) { void indexCacheRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) { return; }
return;
}
int ref = T_REF_INC(cache); int ref = T_REF_INC(cache);
UNUSED(ref); UNUSED(ref);
} }
void indexCacheUnRef(IndexCache* cache) { void indexCacheUnRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) { return; }
return;
}
int ref = T_REF_DEC(cache); int ref = T_REF_DEC(cache);
if (ref == 0) { if (ref == 0) { indexCacheDestroy(cache); }
indexCacheDestroy(cache);
}
} }
void indexMemRef(MemTable* tbl) { void indexMemRef(MemTable* tbl) {
if (tbl == NULL) { if (tbl == NULL) { return; }
return;
}
int ref = T_REF_INC(tbl); int ref = T_REF_INC(tbl);
UNUSED(ref); UNUSED(ref);
} }
void indexMemUnRef(MemTable* tbl) { void indexMemUnRef(MemTable* tbl) {
if (tbl == NULL) { if (tbl == NULL) { return; }
return;
}
int ref = T_REF_DEC(tbl); int ref = T_REF_DEC(tbl);
if (ref == 0) { if (ref == 0) {
SSkipList* slt = tbl->mem; SSkipList* slt = tbl->mem;
...@@ -305,9 +288,7 @@ void indexMemUnRef(MemTable* tbl) { ...@@ -305,9 +288,7 @@ void indexMemUnRef(MemTable* tbl) {
} }
static void cacheTermDestroy(CacheTerm* ct) { static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) { if (ct == NULL) { return; }
return;
}
free(ct->colVal); free(ct->colVal);
free(ct); free(ct);
} }
...@@ -322,9 +303,7 @@ static int32_t compareKey(const void* l, const void* r) { ...@@ -322,9 +303,7 @@ static int32_t compareKey(const void* l, const void* r) {
// compare colVal // compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal); int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) { if (cmp == 0) { return rt->version - lt->version; }
return rt->version - lt->version;
}
return cmp; return cmp;
} }
...@@ -344,9 +323,7 @@ static void doMergeWork(SSchedMsg* msg) { ...@@ -344,9 +323,7 @@ static void doMergeWork(SSchedMsg* msg) {
} }
static bool indexCacheIteratorNext(Iterate* itera) { static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter; SSkipListIterator* iter = itera->iter;
if (iter == NULL) { if (iter == NULL) { return false; }
return false;
}
IterateValue* iv = &itera->val; IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
...@@ -356,7 +333,8 @@ static bool indexCacheIteratorNext(Iterate* itera) { ...@@ -356,7 +333,8 @@ static bool indexCacheIteratorNext(Iterate* itera) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType; iv->type = ct->operaType;
iv->colVal = ct->colVal; iv->colVal = calloc(1, strlen(ct->colVal) + 1);
memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
taosArrayPush(iv->val, &ct->uid); taosArrayPush(iv->val, &ct->uid);
} }
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { return NULL; } if (nsv == NULL) {
return NULL;
}
nsv->kind = kind; nsv->kind = kind;
nsv->type = ty; nsv->type = ty;
...@@ -35,7 +37,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp ...@@ -35,7 +37,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
} }
void startWithStateValueDestroy(void* val) { void startWithStateValueDestroy(void* val) {
StartWithStateValue* sv = (StartWithStateValue*)val; StartWithStateValue* sv = (StartWithStateValue*)val;
if (sv == NULL) { return; } if (sv == NULL) {
return;
}
if (sv->type == FST_INT) { if (sv->type == FST_INT) {
// //
...@@ -48,7 +52,9 @@ void startWithStateValueDestroy(void* val) { ...@@ -48,7 +52,9 @@ void startWithStateValueDestroy(void* val) {
} }
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { return NULL; } if (nsv == NULL) {
return NULL;
}
nsv->kind = sv->kind; nsv->kind = sv->kind;
nsv->type = sv->type; nsv->type = sv->type;
...@@ -65,24 +71,12 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { ...@@ -65,24 +71,12 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
} }
// iterate fst // iterate fst
static void* alwaysMatchStart(AutomationCtx* ctx) { static void* alwaysMatchStart(AutomationCtx* ctx) { return NULL; }
return NULL; static bool alwaysMatchIsMatch(AutomationCtx* ctx, void* state) { return true; }
} static bool alwaysMatchCanMatch(AutomationCtx* ctx, void* state) { return true; }
static bool alwaysMatchIsMatch(AutomationCtx* ctx, void* state) { static bool alwaysMatchWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
return true; static void* alwaysMatchAccpet(AutomationCtx* ctx, void* state, uint8_t byte) { return NULL; }
} static void* alwaysMatchAccpetEof(AutomationCtx* ctx, void* state) { return NULL; }
static bool alwaysMatchCanMatch(AutomationCtx* ctx, void* state) {
return true;
}
static bool alwaysMatchWillAlwaysMatch(AutomationCtx* ctx, void* state) {
return true;
}
static void* alwaysMatchAccpet(AutomationCtx* ctx, void* state, uint8_t byte) {
return NULL;
}
static void* alwaysMatchAccpetEof(AutomationCtx* ctx, void* state) {
return NULL;
}
// prefix query, impl later // prefix query, impl later
static void* prefixStart(AutomationCtx* ctx) { static void* prefixStart(AutomationCtx* ctx) {
...@@ -97,17 +91,20 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) { ...@@ -97,17 +91,20 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
StartWithStateValue* ssv = (StartWithStateValue*)sv; StartWithStateValue* ssv = (StartWithStateValue*)sv;
return ssv->val >= 0; return ssv->val >= 0;
} }
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
return true;
}
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
StartWithStateValue* ssv = (StartWithStateValue*)state; StartWithStateValue* ssv = (StartWithStateValue*)state;
if (ssv == NULL || ctx == NULL) { return NULL; } if (ssv == NULL || ctx == NULL) {
return NULL;
}
char* data = ctx->data; char* data = ctx->data;
if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); } if (ssv->kind == Done) {
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
}
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
int val = ssv->val + 1; int val = ssv->val + 1;
StartWithStateValue* nsv = startWithStateValueCreate(Running, FST_INT, &val); StartWithStateValue* nsv = startWithStateValueCreate(Running, FST_INT, &val);
if (prefixIsMatch(ctx, nsv)) { if (prefixIsMatch(ctx, nsv)) {
nsv->kind = Done; nsv->kind = Done;
...@@ -118,35 +115,22 @@ static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { ...@@ -118,35 +115,22 @@ static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
} }
return NULL; return NULL;
} }
static void* prefixAcceptEof(AutomationCtx* ctx, void* state) { static void* prefixAcceptEof(AutomationCtx* ctx, void* state) { return NULL; }
return NULL;
}
// pattern query, impl later // pattern query, impl later
static void* patternStart(AutomationCtx* ctx) { static void* patternStart(AutomationCtx* ctx) { return NULL; }
return NULL; static bool patternIsMatch(AutomationCtx* ctx, void* data) { return true; }
} static bool patternCanMatch(AutomationCtx* ctx, void* data) { return true; }
static bool patternIsMatch(AutomationCtx* ctx, void* data) { static bool patternWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
return true;
}
static bool patternCanMatch(AutomationCtx* ctx, void* data) {
return true;
}
static bool patternWillAlwaysMatch(AutomationCtx* ctx, void* state) {
return true;
}
static void* patternAccept(AutomationCtx* ctx, void* state, uint8_t byte) { static void* patternAccept(AutomationCtx* ctx, void* state, uint8_t byte) { return NULL; }
return NULL;
}
static void* patternAcceptEof(AutomationCtx* ctx, void* state) { static void* patternAcceptEof(AutomationCtx* ctx, void* state) { return NULL; }
return NULL;
}
AutomationFunc automFuncs[] = { AutomationFunc automFuncs[] = {
{alwaysMatchStart, alwaysMatchIsMatch, alwaysMatchCanMatch, alwaysMatchWillAlwaysMatch, alwaysMatchAccpet, alwaysMatchAccpetEof}, {alwaysMatchStart, alwaysMatchIsMatch, alwaysMatchCanMatch, alwaysMatchWillAlwaysMatch, alwaysMatchAccpet,
alwaysMatchAccpetEof},
{prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof}, {prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof},
{patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof} {patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof}
// add more search type // add more search type
...@@ -154,7 +138,9 @@ AutomationFunc automFuncs[] = { ...@@ -154,7 +138,9 @@ AutomationFunc automFuncs[] = {
AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { return NULL; } if (ctx == NULL) {
return NULL;
}
StartWithStateValue* sv = NULL; StartWithStateValue* sv = NULL;
if (atype == AUTOMATION_ALWAYS) { if (atype == AUTOMATION_ALWAYS) {
...@@ -170,11 +156,14 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { ...@@ -170,11 +156,14 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
// add more search type // add more search type
} }
char* src = (char*)data; char* dst = NULL;
size_t len = strlen(src); if (data != NULL) {
char* dst = (char*)malloc(len * sizeof(char) + 1); char* src = (char*)data;
memcpy(dst, src, len); size_t len = strlen(src);
dst[len] = 0; dst = (char*)malloc(len * sizeof(char) + 1);
memcpy(dst, src, len);
dst[len] = 0;
}
ctx->data = dst; ctx->data = dst;
ctx->type = atype; ctx->type = atype;
......
...@@ -73,6 +73,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -73,6 +73,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
} else { } else {
ctx->file.fd = tfOpenReadWrite(path); ctx->file.fd = tfOpenReadWrite(path);
} }
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.fd < 0) { if (ctx->file.fd < 0) {
indexError("open file error %d", errno); indexError("open file error %d", errno);
goto END; goto END;
...@@ -95,11 +96,12 @@ END: ...@@ -95,11 +96,12 @@ END:
free(ctx); free(ctx);
return NULL; return NULL;
} }
void writerCtxDestroy(WriterCtx* ctx) { void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->type == TMemory) { if (ctx->type == TMemory) {
free(ctx->mem.buf); free(ctx->mem.buf);
} else { } else {
tfClose(ctx->file.fd); tfClose(ctx->file.fd);
if (remove) unlink(ctx->file.buf);
} }
free(ctx); free(ctx);
} }
...@@ -138,10 +140,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) ...@@ -138,10 +140,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
return nRead; return nRead;
} }
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
return 0; int fstCountingWriterFlush(FstCountingWriter* write) {
}
int fstCountingWriterFlush(FstCountingWriter* write) {
WriterCtx* ctx = write->wrt; WriterCtx* ctx = write->wrt;
ctx->flush(ctx); ctx->flush(ctx);
// write->wtr->flush // write->wtr->flush
......
...@@ -32,6 +32,7 @@ typedef struct TFileFstIter { ...@@ -32,6 +32,7 @@ typedef struct TFileFstIter {
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t)) #define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
static int tfileUidCompare(const void* a, const void* b);
static int tfileStrCompare(const void* a, const void* b); static int tfileStrCompare(const void* a, const void* b);
static int tfileValueCompare(const void* a, const void* b, const void* param); static int tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
...@@ -52,11 +53,16 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, ...@@ -52,11 +53,16 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) {
TFileReader* reader = tfileReaderCreate(ctx);
tfileReaderRef(reader);
// tfileSerialCacheKey(&key, buf);
return reader;
}
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
if (tcache == NULL) { if (tcache == NULL) { return NULL; }
return NULL;
}
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64; tcache->capacity = 64;
...@@ -81,16 +87,13 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -81,16 +87,13 @@ TFileCache* tfileCacheCreate(const char* path) {
goto End; goto End;
} }
TFileReader* reader = tfileReaderCreate(wc); char buf[128] = {0};
tfileReaderRef(reader); TFileReader* reader = tfileReaderCreateImpl(wc);
// loader fst and validate it
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, TFileCacheKey key = {.suid = header->suid,
.colName = header->colName, .colName = header->colName,
.nColName = strlen(header->colName), .nColName = strlen(header->colName),
.colType = header->colType}; .colType = header->colType};
char buf[128] = {0};
tfileSerialCacheKey(&key, buf); tfileSerialCacheKey(&key, buf);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
} }
...@@ -102,9 +105,7 @@ End: ...@@ -102,9 +105,7 @@ End:
return NULL; return NULL;
} }
void tfileCacheDestroy(TFileCache* tcache) { void tfileCacheDestroy(TFileCache* tcache) {
if (tcache == NULL) { if (tcache == NULL) { return; }
return;
}
// free table cache // free table cache
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
...@@ -125,9 +126,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { ...@@ -125,9 +126,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
tfileSerialCacheKey(key, buf); tfileSerialCacheKey(key, buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
if (reader == NULL) { if (reader == NULL) { return NULL; }
return NULL;
}
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
...@@ -137,7 +136,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) ...@@ -137,7 +136,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
tfileSerialCacheKey(key, buf); tfileSerialCacheKey(key, buf);
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
if (*p != NULL) { if (p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf)); taosHashRemove(tcache->tableCache, buf, strlen(buf));
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
...@@ -149,9 +148,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) ...@@ -149,9 +148,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
} }
TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* reader = calloc(1, sizeof(TFileReader)); TFileReader* reader = calloc(1, sizeof(TFileReader));
if (reader == NULL) { if (reader == NULL) { return NULL; }
return NULL;
}
// T_REF_INC(reader); // T_REF_INC(reader);
reader->ctx = ctx; reader->ctx = ctx;
...@@ -172,12 +169,10 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -172,12 +169,10 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
return reader; return reader;
} }
void tfileReaderDestroy(TFileReader* reader) { void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) { return; }
return;
}
// T_REF_INC(reader); // T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx); writerCtxDestroy(reader->ctx, true);
free(reader); free(reader);
} }
...@@ -216,7 +211,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -216,7 +211,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
char fullname[256] = {0}; char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024); WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024);
if (wcx == NULL) { return NULL; }
TFileHeader tfh = {0}; TFileHeader tfh = {0};
tfh.suid = suid; tfh.suid = suid;
...@@ -226,6 +222,21 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -226,6 +222,21 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh); return tfileWriterCreate(wcx, &tfh);
} }
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024);
if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreateImpl(wc);
return reader;
// tfileSerialCacheKey(&key, buf);
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
// char pathBuf[128] = {0}; // char pathBuf[128] = {0};
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version); // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
...@@ -249,28 +260,29 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { ...@@ -249,28 +260,29 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
return tw; return tw;
} }
int tfileWriterPut(TFileWriter* tw, void* data) { int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// sort by coltype and write to tindex // sort by coltype and write to tindex
__compar_fn_t fn; if (order == false) {
__compar_fn_t fn;
int8_t colType = tw->header.colType; int8_t colType = tw->header.colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare; fn = tfileStrCompare;
} else { } else {
fn = getComparFunc(colType, 0); fn = getComparFunc(colType, 0);
}
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
} }
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
int32_t bufLimit = 4096, offset = 0; int32_t bufLimit = 64 * 4096, offset = 0;
char* buf = calloc(1, sizeof(char) * bufLimit); // char* buf = calloc(1, sizeof(char) * bufLimit);
char* p = buf; // char* p = buf;
int32_t sz = taosArrayGetSize((SArray*)data); int32_t sz = taosArrayGetSize((SArray*)data);
int32_t fstOffset = tw->offset; int32_t fstOffset = tw->offset;
// ugly code, refactor later // ugly code, refactor later
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
// taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL);
int32_t tbsz = taosArrayGetSize(v->tableId); int32_t tbsz = taosArrayGetSize(v->tableId);
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
} }
...@@ -282,30 +294,39 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -282,30 +294,39 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
int32_t tbsz = taosArrayGetSize(v->tableId); int32_t tbsz = taosArrayGetSize(v->tableId);
// check buf has enough space or not // check buf has enough space or not
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
if (offset + ttsz > bufLimit) {
// batch write
tw->ctx->write(tw->ctx, buf, offset);
offset = 0;
memset(buf, 0, bufLimit);
p = buf;
}
// if (offset + ttsz >= bufLimit) {
// // batch write
// indexInfo("offset: %d, ttsz: %d", offset, ttsz);
// // std::cout << "offset: " << offset << std::endl;
// // std::cout << "ttsz:" << ttsz < < < std::endl;
// tw->ctx->write(tw->ctx, buf, offset);
// offset = 0;
// memset(buf, 0, bufLimit);
// p = buf;
//}
// if (ttsz >= bufLimit) {
//}
char* buf = calloc(1, ttsz * sizeof(char));
char* p = buf;
tfileSerialTableIdsToBuf(p, v->tableId); tfileSerialTableIdsToBuf(p, v->tableId);
offset += ttsz; tw->ctx->write(tw->ctx, buf, ttsz);
p = buf + offset; // offset += ttsz;
// p = buf + offset;
// set up value offset // set up value offset
v->offset = tw->offset; v->offset = tw->offset;
tw->offset += ttsz; tw->offset += ttsz;
free(buf);
} }
if (offset != 0) { // if (offset != 0) {
// write reversed data in buf to tindex // write reversed data in buf to tindex
tw->ctx->write(tw->ctx, buf, offset); // tw->ctx->write(tw->ctx, buf, offset);
} //}
tfree(buf); // tfree(buf);
tw->fb = fstBuilderCreate(tw->ctx, 0); tw->fb = fstBuilderCreate(tw->ctx, 0);
if (tw->fb == NULL) { if (tw->fb == NULL) {
tfileWriterDestroy(tw); tfileWriterClose(tw);
return -1; return -1;
} }
// write fst // write fst
...@@ -321,27 +342,20 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -321,27 +342,20 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
tw->fb = NULL; tw->fb = NULL;
return 0; return 0;
} }
void tfileWriteClose(TFileWriter* tw) { void tfileWriterClose(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) { return; }
return; writerCtxDestroy(tw->ctx, false);
}
writerCtxDestroy(tw->ctx);
free(tw); free(tw);
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) { return; }
return; writerCtxDestroy(tw->ctx, false);
}
writerCtxDestroy(tw->ctx);
free(tw); free(tw);
} }
IndexTFile* indexTFileCreate(const char* path) { IndexTFile* indexTFileCreate(const char* path) {
IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { if (tfile == NULL) { return NULL; }
return NULL;
}
tfile->cache = tfileCacheCreate(path); tfile->cache = tfileCacheCreate(path);
return tfile; return tfile;
...@@ -350,18 +364,14 @@ void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); } ...@@ -350,18 +364,14 @@ void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { if (tfile == NULL) { return ret; }
return ret;
}
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
TFileCacheKey key = { TFileCacheKey key = {
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key); TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
if (reader == NULL) { if (reader == NULL) { return 0; }
return 0;
}
return tfileReaderSearch(reader, query, result); return tfileReaderSearch(reader, query, result);
} }
...@@ -381,9 +391,7 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -381,9 +391,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
TFileFstIter* tIter = iiter->iter; TFileFstIter* tIter = iiter->iter;
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
if (rt == NULL) { if (rt == NULL) { return false; }
return false;
}
int32_t sz = 0; int32_t sz = 0;
char* ch = (char*)fstSliceData(&rt->data, &sz); char* ch = (char*)fstSliceData(&rt->data, &sz);
...@@ -391,12 +399,9 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -391,12 +399,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
memcpy(colVal, ch, sz); memcpy(colVal, ch, sz);
offset = (uint64_t)(rt->out.out); offset = (uint64_t)(rt->out.out);
swsResultDestroy(rt); swsResultDestroy(rt);
// set up iterate value // set up iterate value
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
return false;
}
iv->colVal = colVal; iv->colVal = colVal;
...@@ -407,9 +412,8 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } ...@@ -407,9 +412,8 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(Iterate)); TFileFstIter* tIter = calloc(1, sizeof(Iterate));
if (tIter == NULL) { if (tIter == NULL) { return NULL; }
return NULL;
}
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
tIter->fb = fstSearch(reader->fst, tIter->ctx); tIter->fb = fstSearch(reader->fst, tIter->ctx);
tIter->st = streamBuilderIntoStream(tIter->fb); tIter->st = streamBuilderIntoStream(tIter->fb);
...@@ -418,21 +422,22 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { ...@@ -418,21 +422,22 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
} }
Iterate* tfileIteratorCreate(TFileReader* reader) { Iterate* tfileIteratorCreate(TFileReader* reader) {
Iterate* iter = calloc(1, sizeof(Iterate)); if (reader == NULL) { return NULL; }
Iterate* iter = calloc(1, sizeof(Iterate));
iter->iter = tfileFstIteratorCreate(reader); iter->iter = tfileFstIteratorCreate(reader);
if (iter->iter == NULL) { if (iter->iter == NULL) {
free(iter);
return NULL; return NULL;
} }
iter->next = tfileIteratorNext; iter->next = tfileIteratorNext;
iter->getValue = tifileIterateGetValue; iter->getValue = tifileIterateGetValue;
iter->val.val = taosArrayInit(1, sizeof(uint64_t));
return iter; return iter;
} }
void tfileIteratorDestroy(Iterate* iter) { void tfileIteratorDestroy(Iterate* iter) {
if (iter == NULL) { if (iter == NULL) { return; }
return;
}
IterateValue* iv = &iter->val; IterateValue* iv = &iter->val;
iterateValueDestroy(iv, true); iterateValueDestroy(iv, true);
...@@ -445,18 +450,19 @@ void tfileIteratorDestroy(Iterate* iter) { ...@@ -445,18 +450,19 @@ void tfileIteratorDestroy(Iterate* iter) {
} }
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
if (tf == NULL) { if (tf == NULL) { return NULL; }
return NULL;
}
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key); return tfileCacheGet(tf->cache, &key);
} }
static int tfileUidCompare(const void* a, const void* b) {
uint64_t l = *(uint64_t*)a;
uint64_t r = *(uint64_t*)b;
return l - r;
}
static int tfileStrCompare(const void* a, const void* b) { static int tfileStrCompare(const void* a, const void* b) {
int ret = strcmp((char*)a, (char*)b); int ret = strcmp((char*)a, (char*)b);
if (ret == 0) { if (ret == 0) { return ret; }
return ret;
}
return ret < 0 ? -1 : 1; return ret < 0 ? -1 : 1;
} }
...@@ -471,17 +477,13 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { ...@@ -471,17 +477,13 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue* tfileValueCreate(char* val) { TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue)); TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { if (tf == NULL) { return NULL; }
return NULL; tf->colVal = val;
}
tf->tableId = taosArrayInit(32, sizeof(uint64_t)); tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf; return tf;
} }
int tfileValuePush(TFileValue* tf, uint64_t val) { int tfileValuePush(TFileValue* tf, uint64_t val) {
if (tf == NULL) { if (tf == NULL) { return -1; }
return -1;
}
taosArrayPush(tf->tableId, &val); taosArrayPush(tf->tableId, &val);
return 0; return 0;
} }
...@@ -501,9 +503,7 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { ...@@ -501,9 +503,7 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset); int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset; tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
return -1;
}
tw->offset += sizeof(fstOffset); tw->offset += sizeof(fstOffset);
return 0; return 0;
} }
...@@ -514,9 +514,7 @@ static int tfileWriteHeader(TFileWriter* writer) { ...@@ -514,9 +514,7 @@ static int tfileWriteHeader(TFileWriter* writer) {
memcpy(buf, (char*)header, sizeof(buf)); memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { if (sizeof(buf) != nwrite) { return -1; }
return -1;
}
writer->offset = nwrite; writer->offset = nwrite;
return 0; return 0;
} }
...@@ -547,12 +545,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -547,12 +545,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
} }
static int tfileReaderLoadFst(TFileReader* reader) { static int tfileReaderLoadFst(TFileReader* reader) {
// current load fst into memory, refactor it later // current load fst into memory, refactor it later
static int FST_MAX_SIZE = 16 * 1024; static int FST_MAX_SIZE = 64 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { if (buf == NULL) { return -1; }
return -1;
}
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
...@@ -575,36 +571,33 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -575,36 +571,33 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
int32_t total = sizeof(uint64_t) * nid; int32_t total = sizeof(uint64_t) * nid;
char* buf = calloc(1, total); char* buf = calloc(1, total);
if (buf == NULL) { if (buf == NULL) { return -1; }
return -1;
}
nread = ctx->read(ctx, buf, total); nread = ctx->read(ctx, buf, total);
assert(total == nread); assert(total == nread);
for (int32_t i = 0; i < nid; i++) { for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
taosArrayPush(result, (uint64_t*)buf + i);
}
free(buf); free(buf);
return 0; return 0;
} }
void tfileReaderRef(TFileReader* reader) { void tfileReaderRef(TFileReader* reader) {
if (reader == NULL) { return; }
int ref = T_REF_INC(reader); int ref = T_REF_INC(reader);
UNUSED(ref); UNUSED(ref);
} }
void tfileReaderUnRef(TFileReader* reader) { void tfileReaderUnRef(TFileReader* reader) {
if (reader == NULL) { return; }
int ref = T_REF_DEC(reader); int ref = T_REF_DEC(reader);
if (ref == 0) { if (ref == 0) {
// do nothing
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
} }
} }
static int tfileGetFileList(const char* path, SArray* result) { static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { if (NULL == dir) { return -1; }
return -1;
}
struct dirent* entry; struct dirent* entry;
while ((entry = readdir(dir)) != NULL) { while ((entry = readdir(dir)) != NULL) {
...@@ -632,9 +625,7 @@ static int tfileCompare(const void* a, const void* b) { ...@@ -632,9 +625,7 @@ static int tfileCompare(const void* a, const void* b) {
size_t bLen = strlen(bName); size_t bLen = strlen(bName);
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen); int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
if (ret == 0) { if (ret == 0) { return ret; }
return ret;
}
return ret < 0 ? -1 : 1; return ret < 0 ? -1 : 1;
} }
// tfile name suid-colId-version.tindex // tfile name suid-colId-version.tindex
......
...@@ -12,9 +12,7 @@ ...@@ -12,9 +12,7 @@
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
void* callback(void* s) { void* callback(void* s) { return s; }
return s;
}
static std::string fileName = "/tmp/tindex.tindex"; static std::string fileName = "/tmp/tindex.tindex";
class FstWriter { class FstWriter {
...@@ -34,7 +32,7 @@ class FstWriter { ...@@ -34,7 +32,7 @@ class FstWriter {
fstBuilderFinish(_b); fstBuilderFinish(_b);
fstBuilderDestroy(_b); fstBuilderDestroy(_b);
writerCtxDestroy(_wc); writerCtxDestroy(_wc, false);
} }
private: private:
...@@ -102,7 +100,7 @@ class FstReadMemory { ...@@ -102,7 +100,7 @@ class FstReadMemory {
fstCountingWriterDestroy(_w); fstCountingWriterDestroy(_w);
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc); writerCtxDestroy(_wc, false);
} }
private: private:
......
...@@ -57,7 +57,7 @@ class FstWriter { ...@@ -57,7 +57,7 @@ class FstWriter {
fstBuilderFinish(_b); fstBuilderFinish(_b);
fstBuilderDestroy(_b); fstBuilderDestroy(_b);
writerCtxDestroy(_wc); writerCtxDestroy(_wc, false);
} }
private: private:
...@@ -76,9 +76,7 @@ class FstReadMemory { ...@@ -76,9 +76,7 @@ class FstReadMemory {
bool init() { bool init() {
char* buf = (char*)calloc(1, sizeof(char) * _size); char* buf = (char*)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { if (nRead <= 0) { return false; }
return false;
}
_size = nRead; _size = nRead;
_s = fstSliceCreate((uint8_t*)buf, _size); _s = fstSliceCreate((uint8_t*)buf, _size);
_fst = fstCreate(&_s); _fst = fstCreate(&_s);
...@@ -104,9 +102,7 @@ class FstReadMemory { ...@@ -104,9 +102,7 @@ class FstReadMemory {
StreamWithState* st = streamBuilderIntoStream(sb); StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL; StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { result.push_back((uint64_t)(rt->out.out)); }
result.push_back((uint64_t)(rt->out.out));
}
return true; return true;
} }
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) { bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
...@@ -120,7 +116,7 @@ class FstReadMemory { ...@@ -120,7 +116,7 @@ class FstReadMemory {
fstCountingWriterDestroy(_w); fstCountingWriterDestroy(_w);
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc); writerCtxDestroy(_wc, true);
} }
private: private:
...@@ -182,9 +178,7 @@ void checkFstPerf() { ...@@ -182,9 +178,7 @@ void checkFstPerf() {
delete fw; delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64); FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init()) { if (m->init()) { printf("success to init fst read"); }
printf("success to init fst read");
}
Performance_fstReadRecords(m); Performance_fstReadRecords(m);
delete m; delete m;
} }
...@@ -346,10 +340,8 @@ class TFileObj { ...@@ -346,10 +340,8 @@ class TFileObj {
tfileReaderDestroy(reader_); tfileReaderDestroy(reader_);
reader_ = NULL; reader_ = NULL;
} }
if (writer_ == NULL) { if (writer_ == NULL) { InitWriter(); }
InitWriter(); return tfileWriterPut(writer_, tv, false);
}
return tfileWriterPut(writer_, tv);
} }
bool InitWriter() { bool InitWriter() {
TFileHeader header; TFileHeader header;
...@@ -388,12 +380,8 @@ class TFileObj { ...@@ -388,12 +380,8 @@ class TFileObj {
return tfileReaderSearch(reader_, query, result); return tfileReaderSearch(reader_, query, result);
} }
~TFileObj() { ~TFileObj() {
if (writer_) { if (writer_) { tfileWriterDestroy(writer_); }
tfileWriterDestroy(writer_); if (reader_) { tfileReaderDestroy(reader_); }
}
if (reader_) {
tfileReaderDestroy(reader_);
}
} }
private: private:
...@@ -465,9 +453,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) { ...@@ -465,9 +453,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
taosArrayPush(data, &v4); taosArrayPush(data, &v4);
fObj->Put(data); fObj->Put(data);
for (size_t i = 0; i < taosArrayGetSize(data); i++) { for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); }
destroyTFileValue(taosArrayGetP(data, i));
}
taosArrayDestroy(data); taosArrayDestroy(data);
std::string colName("voltage"); std::string colName("voltage");
...@@ -569,7 +555,8 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -569,7 +555,8 @@ TEST_F(IndexCacheEnv, cache_test) {
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
} }
coj->Debug();
std::cout << "--------first----------" << std::endl;
{ {
std::string colVal("v3"); std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
...@@ -582,6 +569,8 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -582,6 +569,8 @@ TEST_F(IndexCacheEnv, cache_test) {
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++); coj->Put(term, othColId, version++, suid++);
} }
coj->Debug();
std::cout << "--------second----------" << std::endl;
{ {
std::string colVal("v4"); std::string colVal("v4");
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < 10; i++) {
...@@ -602,7 +591,7 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -602,7 +591,7 @@ TEST_F(IndexCacheEnv, cache_test) {
STermValueType valType; STermValueType valType;
coj->Get(&query, colId, 10000, ret, &valType); coj->Get(&query, colId, 10000, ret, &valType);
// std::cout << "size : " << taosArrayGetSize(ret) << std::endl; std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert(taosArrayGetSize(ret) == 4); assert(taosArrayGetSize(ret) == 4);
} }
{ {
...@@ -635,6 +624,20 @@ class IndexObj { ...@@ -635,6 +624,20 @@ class IndexObj {
} }
return ret; return ret;
} }
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < numOfTable; i++) {
int ret = Put(terms, i);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
return numOfTable;
}
int Put(SIndexMultiTerm* fvs, uint64_t uid) { int Put(SIndexMultiTerm* fvs, uint64_t uid) {
numOfWrite += taosArrayGetSize(fvs); numOfWrite += taosArrayGetSize(fvs);
return indexPut(idx, fvs, uid); return indexPut(idx, fvs, uid);
...@@ -645,6 +648,17 @@ class IndexObj { ...@@ -645,6 +648,17 @@ class IndexObj {
return indexSearch(idx, multiQ, result); return indexSearch(idx, multiQ, result);
} }
int SearchOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
return taosArrayGetSize(result);
// assert(taosArrayGetSize(result) == targetSize);
}
void Debug() { void Debug() {
std::cout << "numOfWrite:" << numOfWrite << std::endl; std::cout << "numOfWrite:" << numOfWrite << std::endl;
std::cout << "numOfRead:" << numOfRead << std::endl; std::cout << "numOfRead:" << numOfRead << std::endl;
...@@ -665,12 +679,13 @@ class IndexObj { ...@@ -665,12 +679,13 @@ class IndexObj {
class IndexEnv2 : public ::testing::Test { class IndexEnv2 : public ::testing::Test {
protected: protected:
virtual void SetUp() { virtual void SetUp() {
tfInit();
index = new IndexObj(); index = new IndexObj();
// //
} }
virtual void TearDown() { virtual void TearDown() {
// r
delete index; delete index;
tfCleanup();
} }
IndexObj* index; IndexObj* index;
}; };
...@@ -722,27 +737,39 @@ TEST_F(IndexEnv2, testIndexOpen) { ...@@ -722,27 +737,39 @@ TEST_F(IndexEnv2, testIndexOpen) {
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
index->Search(mq, result); index->Search(mq, result);
assert(taosArrayGetSize(result) == targetSize); std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
// assert(taosArrayGetSize(result) == targetSize);
} }
} }
TEST_F(IndexEnv2, testIndex_CachePut) {
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp"; std::string path = "/tmp";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {}
} int numOfTable = 100 * 10000;
index->WriteMillonData("tag1", "Hello world", numOfTable);
int target = index->SearchOne("tag1", "Hellow world");
assert(numOfTable == target);
} }
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
TEST_F(IndexEnv2, testIndexr_TFilePut) {
std::string path = "/tmp"; std::string path = "/tmp";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {}
}
} }
TEST_F(IndexEnv2, testIndex_CacheSearch) { TEST_F(IndexEnv2, testIndex_multi_thread_write) {
std::string path = "/tmp"; std::string path = "/tmp";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {}
}
} }
TEST_F(IndexEnv2, testIndex_TFileSearch) { TEST_F(IndexEnv2, testIndex_multi_thread_read) {
std::string path = "/tmp"; std::string path = "/tmp";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {}
} }
TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
} }
TEST_F(IndexEnv2, testIndexMultiTag) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册