提交 7d29af73 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 11eb2719
...@@ -63,9 +63,7 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); ...@@ -63,9 +63,7 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// pthread_once(&isInit, indexInit); // pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { if (sIdx == NULL) { return -1; }
return -1;
}
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t* index = index_open(path); index_t* index = index_open(path);
...@@ -75,9 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -75,9 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { if (sIdx->tindex == NULL) { goto END; }
goto END;
}
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1; sIdx->cVersion = 1;
...@@ -88,9 +84,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -88,9 +84,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif #endif
END: END:
if (sIdx != NULL) { if (sIdx != NULL) { indexClose(sIdx); }
indexClose(sIdx);
}
*index = NULL; *index = NULL;
return -1; return -1;
...@@ -106,9 +100,7 @@ void indexClose(SIndex* sIdx) { ...@@ -106,9 +100,7 @@ void indexClose(SIndex* sIdx) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
if (*pCache) { if (*pCache) { indexCacheUnRef(*pCache); }
indexCacheUnRef(*pCache);
}
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
} }
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
...@@ -166,9 +158,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -166,9 +158,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL); assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid); int ret = indexCachePut(*cache, p, uid);
if (ret != 0) { if (ret != 0) { return ret; }
return ret;
}
} }
#endif #endif
...@@ -198,9 +188,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result ...@@ -198,9 +188,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int tsz = 0; int tsz = 0;
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz); index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
for (int i = 0; i < tsz; i++) { for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); }
taosArrayPush(result, &tResult[i]);
}
for (int i = 0; i < nQuery; i++) { for (int i = 0; i < nQuery; i++) {
free(fields[i]); free(fields[i]);
...@@ -257,9 +245,7 @@ void indexOptsDestroy(SIndexOpts* opts) { ...@@ -257,9 +245,7 @@ void indexOptsDestroy(SIndexOpts* opts) {
*/ */
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery)); SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { if (p == NULL) { return NULL; }
return NULL;
}
p->opera = opera; p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p; return p;
...@@ -281,9 +267,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde ...@@ -281,9 +267,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) { int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm))); SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { if (t == NULL) { return NULL; }
return NULL;
}
t->suid = suid; t->suid = suid;
t->operType = oper; t->operType = oper;
...@@ -356,9 +340,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -356,9 +340,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return 0; return 0;
} }
static void indexInterResultsDestroy(SArray* results) { static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) { if (results == NULL) { return; }
return;
}
size_t sz = taosArrayGetSize(results); size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
...@@ -414,22 +396,16 @@ static void indexDestroyTempResult(SArray* result) { ...@@ -414,22 +396,16 @@ static void indexDestroyTempResult(SArray* result) {
taosArrayDestroy(result); taosArrayDestroy(result);
} }
int indexFlushCacheTFile(SIndex* sIdx, void* cache) { int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { if (sIdx == NULL) { return -1; }
return -1;
}
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { if (pReader == NULL) { indexWarn("empty tfile reader found"); }
indexWarn("empty tfile reader found");
}
// handle flush // handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader); Iterate* tfileIter = tfileIteratorCreate(pReader);
if (tfileIter == NULL) { if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
indexWarn("empty tfile reader iterator");
}
SArray* result = taosArrayInit(1024, sizeof(void*)); SArray* result = taosArrayInit(1024, sizeof(void*));
...@@ -496,9 +472,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { ...@@ -496,9 +472,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
taosArrayDestroy(value->val); taosArrayDestroy(value->val);
value->val = NULL; value->val = NULL;
} else { } else {
if (value->val != NULL) { if (value->val != NULL) { taosArrayClear(value->val); }
taosArrayClear(value->val);
}
} }
free(value->colVal); free(value->colVal);
value->colVal = NULL; value->colVal = NULL;
...@@ -521,9 +495,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -521,9 +495,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose(tw); tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
if (reader == NULL) { if (reader == NULL) { goto END; }
goto END;
}
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
......
...@@ -115,9 +115,7 @@ void indexCacheDestroySkiplist(SSkipList* slt) { ...@@ -115,9 +115,7 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroy(slt); tSkipListDestroy(slt);
} }
void indexCacheDestroyImm(IndexCache* cache) { void indexCacheDestroyImm(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) { return; }
return;
}
MemTable* tbl = NULL; MemTable* tbl = NULL;
pthread_mutex_lock(&cache->mtx); pthread_mutex_lock(&cache->mtx);
...@@ -130,9 +128,7 @@ void indexCacheDestroyImm(IndexCache* cache) { ...@@ -130,9 +128,7 @@ void indexCacheDestroyImm(IndexCache* cache) {
} }
void indexCacheDestroy(void* cache) { void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
if (pCache == NULL) { if (pCache == NULL) { return; }
return;
}
indexMemUnRef(pCache->mem); indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm); indexMemUnRef(pCache->imm);
free(pCache->colName); free(pCache->colName);
...@@ -142,9 +138,7 @@ void indexCacheDestroy(void* cache) { ...@@ -142,9 +138,7 @@ void indexCacheDestroy(void* cache) {
Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* indexCacheIteratorCreate(IndexCache* cache) {
Iterate* iiter = calloc(1, sizeof(Iterate)); Iterate* iiter = calloc(1, sizeof(Iterate));
if (iiter == NULL) { if (iiter == NULL) { return NULL; }
return NULL;
}
pthread_mutex_lock(&cache->mtx); pthread_mutex_lock(&cache->mtx);
...@@ -162,9 +156,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) { ...@@ -162,9 +156,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
return iiter; return iiter;
} }
void indexCacheIteratorDestroy(Iterate* iter) { void indexCacheIteratorDestroy(Iterate* iter) {
if (iter == NULL) { if (iter == NULL) { return; }
return;
}
tSkipListDestroyIter(iter->iter); tSkipListDestroyIter(iter->iter);
iterateValueDestroy(&iter->val, true); iterateValueDestroy(&iter->val, true);
free(iter); free(iter);
...@@ -202,17 +194,13 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -202,17 +194,13 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
} }
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { if (cache == NULL) { return -1; }
return -1;
}
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache); indexCacheRef(pCache);
// encode data // encode data
CacheTerm* ct = calloc(1, sizeof(CacheTerm)); CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (cache == NULL) { if (cache == NULL) { return -1; }
return -1;
}
// set up key // set up key
ct->colType = term->colType; ct->colType = term->colType;
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
...@@ -243,9 +231,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u ...@@ -243,9 +231,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
} }
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
if (mem == NULL) { if (mem == NULL) { return 0; }
return 0;
}
char* key = getIndexKey(ct); char* key = getIndexKey(ct);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
...@@ -271,9 +257,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA ...@@ -271,9 +257,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return 0; return 0;
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { if (cache == NULL) { return 0; }
return 0;
}
IndexCache* pCache = cache; IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL; MemTable *mem = NULL, *imm = NULL;
...@@ -301,33 +285,23 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV ...@@ -301,33 +285,23 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
} }
void indexCacheRef(IndexCache* cache) { void indexCacheRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) { return; }
return;
}
int ref = T_REF_INC(cache); int ref = T_REF_INC(cache);
UNUSED(ref); UNUSED(ref);
} }
void indexCacheUnRef(IndexCache* cache) { void indexCacheUnRef(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) { return; }
return;
}
int ref = T_REF_DEC(cache); int ref = T_REF_DEC(cache);
if (ref == 0) { if (ref == 0) { indexCacheDestroy(cache); }
indexCacheDestroy(cache);
}
} }
void indexMemRef(MemTable* tbl) { void indexMemRef(MemTable* tbl) {
if (tbl == NULL) { if (tbl == NULL) { return; }
return;
}
int ref = T_REF_INC(tbl); int ref = T_REF_INC(tbl);
UNUSED(ref); UNUSED(ref);
} }
void indexMemUnRef(MemTable* tbl) { void indexMemUnRef(MemTable* tbl) {
if (tbl == NULL) { if (tbl == NULL) { return; }
return;
}
int ref = T_REF_DEC(tbl); int ref = T_REF_DEC(tbl);
if (ref == 0) { if (ref == 0) {
SSkipList* slt = tbl->mem; SSkipList* slt = tbl->mem;
...@@ -337,9 +311,7 @@ void indexMemUnRef(MemTable* tbl) { ...@@ -337,9 +311,7 @@ void indexMemUnRef(MemTable* tbl) {
} }
static void cacheTermDestroy(CacheTerm* ct) { static void cacheTermDestroy(CacheTerm* ct) {
if (ct == NULL) { if (ct == NULL) { return; }
return;
}
free(ct->colVal); free(ct->colVal);
free(ct); free(ct);
} }
...@@ -354,9 +326,7 @@ static int32_t compareKey(const void* l, const void* r) { ...@@ -354,9 +326,7 @@ static int32_t compareKey(const void* l, const void* r) {
// compare colVal // compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal); int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) { if (cmp == 0) { return rt->version - lt->version; }
return rt->version - lt->version;
}
return cmp; return cmp;
} }
...@@ -376,9 +346,7 @@ static void doMergeWork(SSchedMsg* msg) { ...@@ -376,9 +346,7 @@ static void doMergeWork(SSchedMsg* msg) {
} }
static bool indexCacheIteratorNext(Iterate* itera) { static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter; SSkipListIterator* iter = itera->iter;
if (iter == NULL) { if (iter == NULL) { return false; }
return false;
}
IterateValue* iv = &itera->val; IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
......
...@@ -55,9 +55,7 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s ...@@ -55,9 +55,7 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
if (tcache == NULL) { if (tcache == NULL) { return NULL; }
return NULL;
}
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64; tcache->capacity = 64;
...@@ -73,9 +71,7 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -73,9 +71,7 @@ TFileCache* tfileCacheCreate(const char* path) {
} }
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (reader == NULL) { if (reader == NULL) { goto End; }
goto End;
}
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)}; ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
...@@ -93,9 +89,7 @@ End: ...@@ -93,9 +89,7 @@ End:
return NULL; return NULL;
} }
void tfileCacheDestroy(TFileCache* tcache) { void tfileCacheDestroy(TFileCache* tcache) {
if (tcache == NULL) { if (tcache == NULL) { return; }
return;
}
// free table cache // free table cache
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
...@@ -116,9 +110,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { ...@@ -116,9 +110,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL) { if (reader == NULL) { return NULL; }
return NULL;
}
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
...@@ -141,9 +133,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { ...@@ -141,9 +133,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
} }
TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* reader = calloc(1, sizeof(TFileReader)); TFileReader* reader = calloc(1, sizeof(TFileReader));
if (reader == NULL) { if (reader == NULL) { return NULL; }
return NULL;
}
// T_REF_INC(reader); // T_REF_INC(reader);
reader->ctx = ctx; reader->ctx = ctx;
...@@ -163,9 +153,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -163,9 +153,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
return reader; return reader;
} }
void tfileReaderDestroy(TFileReader* reader) { void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) { return; }
return;
}
// T_REF_INC(reader); // T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx, reader->remove); writerCtxDestroy(reader->ctx, reader->remove);
...@@ -205,9 +193,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -205,9 +193,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname); // indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64); WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { if (wcx == NULL) { return NULL; }
return NULL;
}
TFileHeader tfh = {0}; TFileHeader tfh = {0};
tfh.suid = suid; tfh.suid = suid;
...@@ -223,9 +209,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -223,9 +209,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) { if (wc == NULL) { return NULL; }
return NULL;
}
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
...@@ -312,25 +296,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -312,25 +296,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
return 0; return 0;
} }
void tfileWriterClose(TFileWriter* tw) { void tfileWriterClose(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) { return; }
return;
}
writerCtxDestroy(tw->ctx, false); writerCtxDestroy(tw->ctx, false);
free(tw); free(tw);
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) { return; }
return;
}
writerCtxDestroy(tw->ctx, false); writerCtxDestroy(tw->ctx, false);
free(tw); free(tw);
} }
IndexTFile* indexTFileCreate(const char* path) { IndexTFile* indexTFileCreate(const char* path) {
TFileCache* cache = tfileCacheCreate(path); TFileCache* cache = tfileCacheCreate(path);
if (cache == NULL) { if (cache == NULL) { return NULL; }
return NULL;
}
IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { if (tfile == NULL) {
...@@ -342,27 +320,21 @@ IndexTFile* indexTFileCreate(const char* path) { ...@@ -342,27 +320,21 @@ IndexTFile* indexTFileCreate(const char* path) {
return tfile; return tfile;
} }
void indexTFileDestroy(IndexTFile* tfile) { void indexTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { if (tfile == NULL) { return; }
return;
}
tfileCacheDestroy(tfile->cache); tfileCacheDestroy(tfile->cache);
free(tfile); free(tfile);
} }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { if (tfile == NULL) { return ret; }
return ret;
}
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key); TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
if (reader == NULL) { if (reader == NULL) { return 0; }
return 0;
}
return tfileReaderSearch(reader, query, result); return tfileReaderSearch(reader, query, result);
} }
...@@ -384,9 +356,7 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -384,9 +356,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
TFileFstIter* tIter = iiter->iter; TFileFstIter* tIter = iiter->iter;
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
if (rt == NULL) { if (rt == NULL) { return false; }
return false;
}
int32_t sz = 0; int32_t sz = 0;
char* ch = (char*)fstSliceData(&rt->data, &sz); char* ch = (char*)fstSliceData(&rt->data, &sz);
...@@ -396,9 +366,7 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -396,9 +366,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
offset = (uint64_t)(rt->out.out); offset = (uint64_t)(rt->out.out);
swsResultDestroy(rt); swsResultDestroy(rt);
// set up iterate value // set up iterate value
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
return false;
}
iv->colVal = colVal; iv->colVal = colVal;
return true; return true;
...@@ -409,9 +377,7 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } ...@@ -409,9 +377,7 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter)); TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter));
if (tIter == NULL) { if (tIter == NULL) { return NULL; }
return NULL;
}
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
tIter->fb = fstSearch(reader->fst, tIter->ctx); tIter->fb = fstSearch(reader->fst, tIter->ctx);
...@@ -421,9 +387,7 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { ...@@ -421,9 +387,7 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
} }
Iterate* tfileIteratorCreate(TFileReader* reader) { Iterate* tfileIteratorCreate(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) { return NULL; }
return NULL;
}
Iterate* iter = calloc(1, sizeof(Iterate)); Iterate* iter = calloc(1, sizeof(Iterate));
iter->iter = tfileFstIteratorCreate(reader); iter->iter = tfileFstIteratorCreate(reader);
...@@ -438,9 +402,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) { ...@@ -438,9 +402,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
return iter; return iter;
} }
void tfileIteratorDestroy(Iterate* iter) { void tfileIteratorDestroy(Iterate* iter) {
if (iter == NULL) { if (iter == NULL) { return; }
return;
}
IterateValue* iv = &iter->val; IterateValue* iv = &iter->val;
iterateValueDestroy(iv, true); iterateValueDestroy(iv, true);
...@@ -455,9 +417,7 @@ void tfileIteratorDestroy(Iterate* iter) { ...@@ -455,9 +417,7 @@ void tfileIteratorDestroy(Iterate* iter) {
} }
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
if (tf == NULL) { if (tf == NULL) { return NULL; }
return NULL;
}
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key); return tfileCacheGet(tf->cache, &key);
} }
...@@ -469,9 +429,7 @@ static int tfileUidCompare(const void* a, const void* b) { ...@@ -469,9 +429,7 @@ static int tfileUidCompare(const void* a, const void* b) {
} }
static int tfileStrCompare(const void* a, const void* b) { static int tfileStrCompare(const void* a, const void* b) {
int ret = strcmp((char*)a, (char*)b); int ret = strcmp((char*)a, (char*)b);
if (ret == 0) { if (ret == 0) { return ret; }
return ret;
}
return ret < 0 ? -1 : 1; return ret < 0 ? -1 : 1;
} }
...@@ -486,17 +444,13 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { ...@@ -486,17 +444,13 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue* tfileValueCreate(char* val) { TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue)); TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { if (tf == NULL) { return NULL; }
return NULL;
}
tf->colVal = tstrdup(val); tf->colVal = tstrdup(val);
tf->tableId = taosArrayInit(32, sizeof(uint64_t)); tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf; return tf;
} }
int tfileValuePush(TFileValue* tf, uint64_t val) { int tfileValuePush(TFileValue* tf, uint64_t val) {
if (tf == NULL) { if (tf == NULL) { return -1; }
return -1;
}
taosArrayPush(tf->tableId, &val); taosArrayPush(tf->tableId, &val);
return 0; return 0;
} }
...@@ -517,9 +471,7 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { ...@@ -517,9 +471,7 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset); int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset; tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
return -1;
}
tw->offset += sizeof(fstOffset); tw->offset += sizeof(fstOffset);
return 0; return 0;
} }
...@@ -530,9 +482,7 @@ static int tfileWriteHeader(TFileWriter* writer) { ...@@ -530,9 +482,7 @@ static int tfileWriteHeader(TFileWriter* writer) {
memcpy(buf, (char*)header, sizeof(buf)); memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { if (sizeof(buf) != nwrite) { return -1; }
return -1;
}
writer->offset = nwrite; writer->offset = nwrite;
return 0; return 0;
} }
...@@ -574,9 +524,7 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -574,9 +524,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static int FST_MAX_SIZE = 64 * 1024 * 1024; static int FST_MAX_SIZE = 64 * 1024 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { if (buf == NULL) { return -1; }
return -1;
}
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
...@@ -601,31 +549,23 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -601,31 +549,23 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
int32_t total = sizeof(uint64_t) * nid; int32_t total = sizeof(uint64_t) * nid;
char* buf = calloc(1, total); char* buf = calloc(1, total);
if (buf == NULL) { if (buf == NULL) { return -1; }
return -1;
}
nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid)); nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
assert(total == nread); assert(total == nread);
for (int32_t i = 0; i < nid; i++) { for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
taosArrayPush(result, (uint64_t*)buf + i);
}
free(buf); free(buf);
return 0; return 0;
} }
void tfileReaderRef(TFileReader* reader) { void tfileReaderRef(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) { return; }
return;
}
int ref = T_REF_INC(reader); int ref = T_REF_INC(reader);
UNUSED(ref); UNUSED(ref);
} }
void tfileReaderUnRef(TFileReader* reader) { void tfileReaderUnRef(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) { return; }
return;
}
int ref = T_REF_DEC(reader); int ref = T_REF_DEC(reader);
if (ref == 0) { if (ref == 0) {
// do nothing // do nothing
...@@ -641,15 +581,11 @@ static SArray* tfileGetFileList(const char* path) { ...@@ -641,15 +581,11 @@ static SArray* tfileGetFileList(const char* path) {
uint32_t version; uint32_t version;
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { if (NULL == dir) { return NULL; }
return NULL;
}
struct dirent* entry; struct dirent* entry;
while ((entry = readdir(dir)) != NULL) { while ((entry = readdir(dir)) != NULL) {
char* file = entry->d_name; char* file = entry->d_name;
if (0 != tfileParseFileName(file, &suid, buf, &version)) { if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
continue;
}
size_t len = strlen(path) + 1 + strlen(file) + 1; size_t len = strlen(path) + 1 + strlen(file) + 1;
char* buf = calloc(1, len); char* buf = calloc(1, len);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册