From 9233663a985c4aca1bfd925d006320ebce0da156 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 22 Feb 2022 22:02:28 +0800 Subject: [PATCH] add index test U --- source/libs/index/src/index.c | 56 +++-- source/libs/index/src/index_cache.c | 69 ++++-- source/libs/index/src/index_fst.c | 198 +++++++++++----- source/libs/index/src/index_fst_automation.c | 24 +- source/libs/index/src/index_fst_util.c | 25 +- source/libs/index/src/index_tfile.c | 126 +++++++--- source/libs/index/test/CMakeLists.txt | 20 ++ source/libs/index/test/fstTest.cc | 11 +- source/libs/index/test/fstUT.cc | 232 +++++++++++++++++++ 9 files changed, 602 insertions(+), 159 deletions(-) create mode 100644 source/libs/index/test/fstUT.cc diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 0c222eae1a..9287a91828 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -66,7 +66,9 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv); int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); - if (sIdx == NULL) { return -1; } + if (sIdx == NULL) { + return -1; + } #ifdef USE_LUCENE index_t* index = index_open(path); @@ -76,7 +78,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { #ifdef USE_INVERTED_INDEX // sIdx->cache = (void*)indexCacheCreate(sIdx); sIdx->tindex = indexTFileCreate(path); - if (sIdx->tindex == NULL) { goto END; } + if (sIdx->tindex == NULL) { + goto END; + } sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; @@ -87,7 +91,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { #endif END: - if (sIdx != NULL) { indexClose(sIdx); } + if (sIdx != NULL) { + indexClose(sIdx); + } *index = NULL; return -1; @@ -103,7 +109,9 @@ void indexClose(SIndex* sIdx) { void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; - if (*pCache) { indexCacheUnRef(*pCache); } + if (*pCache) { + indexCacheUnRef(*pCache); + } iter = taosHashIterate(sIdx->colObj, iter); } taosHashCleanup(sIdx->colObj); @@ -161,7 +169,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); int ret = indexCachePut(*cache, p, uid); - if (ret != 0) { return ret; } + if (ret != 0) { + return ret; + } } #endif @@ -191,7 +201,9 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result int tsz = 0; index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz); - for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); } + for (int i = 0; i < tsz; i++) { + taosArrayPush(result, &tResult[i]); + } for (int i = 0; i < nQuery; i++) { free(fields[i]); @@ -248,7 +260,9 @@ void indexOptsDestroy(SIndexOpts* opts) { */ SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery)); - if (p == NULL) { return NULL; } + if (p == NULL) { + return NULL; + } p->opera = opera; p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); return p; @@ -270,7 +284,9 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) { SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm))); - if (t == NULL) { return NULL; } + if (t == NULL) { + return NULL; + } t->suid = suid; t->operType = oper; @@ -343,7 +359,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result return 0; } static void indexInterResultsDestroy(SArray* results) { - if (results == NULL) { return; } + if (results == NULL) { + return; + } size_t sz = taosArrayGetSize(results); for (size_t i = 0; i < sz; i++) { @@ -419,18 +437,24 @@ static void indexDestroyTempResult(SArray* result) { taosArrayDestroy(result); } int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { - if (sIdx == NULL) { return -1; } + if (sIdx == NULL) { + return -1; + } indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); int64_t st = taosGetTimestampUs(); IndexCache* pCache = (IndexCache*)cache; TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); - if (pReader == NULL) { indexWarn("empty tfile reader found"); } + if (pReader == NULL) { + indexWarn("empty tfile reader found"); + } // handle flush Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* tfileIter = tfileIteratorCreate(pReader); - if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); } + if (tfileIter == NULL) { + indexWarn("empty tfile reader iterator"); + } SArray* result = taosArrayInit(1024, sizeof(void*)); @@ -484,7 +508,9 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { taosArrayDestroy(value->val); value->val = NULL; } else { - if (value->val != NULL) { taosArrayClear(value->val); } + if (value->val != NULL) { + taosArrayClear(value->val); + } } free(value->colVal); value->colVal = NULL; @@ -507,7 +533,9 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { tfileWriterClose(tw); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); - if (reader == NULL) { return -1; } + if (reader == NULL) { + return -1; + } TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 48566a8674..d6a7141825 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -119,13 +119,17 @@ void indexCacheDestroySkiplist(SSkipList* slt) { tSkipListDestroy(slt); } void indexCacheDestroyImm(IndexCache* cache) { - if (cache == NULL) { return; } + if (cache == NULL) { + return; + } MemTable* tbl = NULL; pthread_mutex_lock(&cache->mtx); + tbl = cache->imm; cache->imm = NULL; // or throw int bg thread pthread_cond_broadcast(&cache->finished); + pthread_mutex_unlock(&cache->mtx); indexMemUnRef(tbl); @@ -133,7 +137,9 @@ void indexCacheDestroyImm(IndexCache* cache) { } void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; - if (pCache == NULL) { return; } + if (pCache == NULL) { + return; + } indexMemUnRef(pCache->mem); indexMemUnRef(pCache->imm); free(pCache->colName); @@ -146,7 +152,9 @@ void indexCacheDestroy(void* cache) { Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* iiter = calloc(1, sizeof(Iterate)); - if (iiter == NULL) { return NULL; } + if (iiter == NULL) { + return NULL; + } pthread_mutex_lock(&cache->mtx); @@ -164,7 +172,9 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) { return iiter; } void indexCacheIteratorDestroy(Iterate* iter) { - if (iter == NULL) { return; } + if (iter == NULL) { + return; + } tSkipListDestroyIter(iter->iter); iterateValueDestroy(&iter->val, true); free(iter); @@ -186,9 +196,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } else if (cache->imm != NULL) { // TODO: wake up by condition variable pthread_cond_wait(&cache->finished, &cache->mtx); - // pthread_mutex_unlock(&cache->mtx); - // taosMsleep(50); - // pthread_mutex_lock(&cache->mtx); } else { indexCacheRef(cache); cache->imm = cache->mem; @@ -202,13 +209,17 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { - if (cache == NULL) { return -1; } + if (cache == NULL) { + return -1; + } IndexCache* pCache = cache; indexCacheRef(pCache); // encode data CacheTerm* ct = calloc(1, sizeof(CacheTerm)); - if (cache == NULL) { return -1; } + if (cache == NULL) { + return -1; + } // set up key ct->colType = term->colType; ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); @@ -240,7 +251,9 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u } static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { - if (mem == NULL) { return 0; } + if (mem == NULL) { + return 0; + } char* key = indexCacheTermGet(ct); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); @@ -266,7 +279,9 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { - if (cache == NULL) { return 0; } + if (cache == NULL) { + return 0; + } IndexCache* pCache = cache; MemTable *mem = NULL, *imm = NULL; @@ -294,23 +309,33 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV } void indexCacheRef(IndexCache* cache) { - if (cache == NULL) { return; } + if (cache == NULL) { + return; + } int ref = T_REF_INC(cache); UNUSED(ref); } void indexCacheUnRef(IndexCache* cache) { - if (cache == NULL) { return; } + if (cache == NULL) { + return; + } int ref = T_REF_DEC(cache); - if (ref == 0) { indexCacheDestroy(cache); } + if (ref == 0) { + indexCacheDestroy(cache); + } } void indexMemRef(MemTable* tbl) { - if (tbl == NULL) { return; } + if (tbl == NULL) { + return; + } int ref = T_REF_INC(tbl); UNUSED(ref); } void indexMemUnRef(MemTable* tbl) { - if (tbl == NULL) { return; } + if (tbl == NULL) { + return; + } int ref = T_REF_DEC(tbl); if (ref == 0) { SSkipList* slt = tbl->mem; @@ -320,7 +345,9 @@ void indexMemUnRef(MemTable* tbl) { } static void indexCacheTermDestroy(CacheTerm* ct) { - if (ct == NULL) { return; } + if (ct == NULL) { + return; + } free(ct->colVal); free(ct); } @@ -333,7 +360,9 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { CacheTerm* rt = (CacheTerm*)r; // compare colVal int32_t cmp = strcmp(lt->colVal, rt->colVal); - if (cmp == 0) { return rt->version - lt->version; } + if (cmp == 0) { + return rt->version - lt->version; + } return cmp; } @@ -354,7 +383,9 @@ static void doMergeWork(SSchedMsg* msg) { } static bool indexCacheIteratorNext(Iterate* itera) { SSkipListIterator* iter = itera->iter; - if (iter == NULL) { return false; } + if (iter == NULL) { + return false; + } IterateValue* iv = &itera->val; iterateValueDestroy(iv, false); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 46b4c9d7c6..3664bcfad0 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -31,20 +31,24 @@ static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, Compi FstUnFinishedNodes* fstUnFinishedNodesCreate() { FstUnFinishedNodes* nodes = malloc(sizeof(FstUnFinishedNodes)); - if (nodes == NULL) { return NULL; } + if (nodes == NULL) { + return NULL; + } nodes->stack = (SArray*)taosArrayInit(64, sizeof(FstBuilderNodeUnfinished)); fstUnFinishedNodesPushEmpty(nodes, false); return nodes; } -void unFinishedNodeDestroyElem(void* elem) { +static void unFinishedNodeDestroyElem(void* elem) { FstBuilderNodeUnfinished* b = (FstBuilderNodeUnfinished*)elem; fstBuilderNodeDestroy(b->node); free(b->last); b->last = NULL; } void fstUnFinishedNodesDestroy(FstUnFinishedNodes* nodes) { - if (nodes == NULL) { return; } + if (nodes == NULL) { + return; + } taosArrayDestroyEx(nodes->stack, unFinishedNodeDestroyElem); free(nodes); @@ -92,7 +96,9 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes* nodes, CompiledAddr add } void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output out) { FstSlice* s = &bs; - if (fstSliceIsEmpty(s)) { return; } + if (fstSliceIsEmpty(s)) { + return; + } size_t sz = taosArrayGetSize(nodes->stack) - 1; FstBuilderNodeUnfinished* un = taosArrayGet(nodes->stack, sz); assert(un->last == NULL); @@ -172,7 +178,9 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstState fstStateCreateFrom(FstSlice* slice, CompiledAddr addr) { FstState fs = {.state = EmptyFinal, .val = 0}; - if (addr == EMPTY_ADDRESS) { return fs; } + if (addr == EMPTY_ADDRESS) { + return fs; + } uint8_t* data = fstSliceData(slice, NULL); uint8_t v = data[addr]; @@ -229,7 +237,9 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran fstStateSetCommInput(&st, trn->inp); bool null = false; uint8_t inp = fstStateCommInput(&st, &null); - if (null == true) { fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp)); } + if (null == true) { + fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp)); + } fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val)); return; } @@ -263,7 +273,9 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil fstStateSetStateNtrans(&st, (uint8_t)sz); if (anyOuts) { - if (FST_BUILDER_NODE_IS_FINAL(node)) { fstCountingWriterPackUintIn(w, node->finalOutput, oSize); } + if (FST_BUILDER_NODE_IS_FINAL(node)) { + fstCountingWriterPackUintIn(w, node->finalOutput, oSize); + } for (int32_t i = sz - 1; i >= 0; i--) { FstTransition* t = taosArrayGet(node->trans, i); fstCountingWriterPackUintIn(w, t->out, oSize); @@ -428,7 +440,9 @@ Output fstStateOutput(FstState* s, FstNode* node) { assert(s->state == OneTrans); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes); - if (oSizes == 0) { return 0; } + if (oSizes == 0) { + return 0; + } FstSlice* slice = &node->data; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); @@ -440,7 +454,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) { assert(s->state == AnyTrans); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes); - if (oSizes == 0) { return 0; } + if (oSizes == 0) { + return 0; + } FstSlice* slice = &node->data; uint8_t* data = fstSliceData(slice, NULL); uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size @@ -453,7 +469,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) { void fstStateSetFinalState(FstState* s, bool yes) { assert(s->state == AnyTrans); - if (yes) { s->val |= 0b01000000; } + if (yes) { + s->val |= 0b01000000; + } return; } bool fstStateIsFinalState(FstState* s) { @@ -463,7 +481,9 @@ bool fstStateIsFinalState(FstState* s) { void fstStateSetStateNtrans(FstState* s, uint8_t n) { assert(s->state == AnyTrans); - if (n <= 0b00111111) { s->val = (s->val & 0b11000000) | n; } + if (n <= 0b00111111) { + s->val = (s->val & 0b11000000) | n; + } return; } // state_ntrans @@ -495,7 +515,9 @@ uint64_t fstStateNtransLen(FstState* s) { uint64_t fstStateNtrans(FstState* s, FstSlice* slice) { bool null = false; uint8_t n = fstStateStateNtrans(s, &null); - if (null != true) { return n; } + if (null != true) { + return n; + } int32_t len; uint8_t* data = fstSliceData(slice, &len); n = data[len - 2]; @@ -505,7 +527,9 @@ uint64_t fstStateNtrans(FstState* s, FstSlice* slice) { } Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackSizes sizes, uint64_t nTrans) { uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); - if (oSizes == 0 || !fstStateIsFinalState(s)) { return 0; } + if (oSizes == 0 || !fstStateIsFinalState(s)) { + return 0; + } uint64_t at = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1 // pack size - fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes; @@ -522,7 +546,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) { uint8_t* data = fstSliceData(slice, &dlen); uint64_t i = data[at + b]; // uint64_t i = slice->data[slice->start + at + b]; - if (i >= node->nTrans) { *null = true; } + if (i >= node->nTrans) { + *null = true; + } return i; } else { uint64_t start = node->start - fstStateNtransLen(s) - 1 // pack size @@ -539,7 +565,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) { return node->nTrans - i - 1; // bug } } - if (i == len) { *null = true; } + if (i == len) { + *null = true; + } fstSliceDestroy(&t); } } @@ -548,7 +576,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) { FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) { FstNode* n = (FstNode*)malloc(sizeof(FstNode)); - if (n == NULL) { return NULL; } + if (n == NULL) { + return NULL; + } FstState st = fstStateCreateFrom(slice, addr); @@ -614,7 +644,9 @@ void fstNodeDestroy(FstNode* node) { } FstTransitions* fstNodeTransitions(FstNode* node) { FstTransitions* t = malloc(sizeof(FstTransitions)); - if (NULL == t) { return NULL; } + if (NULL == t) { + return NULL; + } FstRange range = {.start = 0, .end = FST_NODE_LEN(node)}; t->range = range; t->node = node; @@ -721,7 +753,9 @@ bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, Compiled FstBuilder* fstBuilderCreate(void* w, FstType ty) { FstBuilder* b = malloc(sizeof(FstBuilder)); - if (NULL == b) { return b; } + if (NULL == b) { + return b; + } b->wrt = fstCountingWriterCreate(w); b->unfinished = fstUnFinishedNodesCreate(); @@ -735,15 +769,17 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) { taosEncodeFixedU64(&pBuf64, VERSION); fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); - memset(buf64, 0, sizeof(buf64)); pBuf64 = buf64; + memset(buf64, 0, sizeof(buf64)); taosEncodeFixedU64(&pBuf64, ty); fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); return b; } void fstBuilderDestroy(FstBuilder* b) { - if (b == NULL) { return; } + if (b == NULL) { + return; + } fstCountingWriterDestroy(b->wrt); fstUnFinishedNodesDestroy(b->unfinished); @@ -830,6 +866,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) { fstUnFinishedNodesTopLastFreeze(b->unfinished, addr); return; } + CompiledAddr fstBuilderCompile(FstBuilder* b, FstBuilderNode* bn) { if (FST_BUILDER_NODE_IS_FINAL(bn) && FST_BUILDER_NODE_TRANS_ISEMPTY(bn) && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn)) { return EMPTY_ADDRESS; @@ -844,7 +881,9 @@ CompiledAddr fstBuilderCompile(FstBuilder* b, FstBuilderNode* bn) { fstBuilderNodeCompileTo(bn, b->wrt, b->lastAddr, startAddr); b->lastAddr = (CompiledAddr)(FST_WRITER_COUNT(b->wrt) - 1); - if (entry->state == NOTFOUND) { FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); } + if (entry->state == NOTFOUND) { + FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); + } fstRegistryEntryDestroy(entry); return b->lastAddr; @@ -887,7 +926,9 @@ FstSlice fstNodeAsSlice(FstNode* node) { FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) { FstLastTransition* trn = malloc(sizeof(FstLastTransition)); - if (trn == NULL) { return NULL; } + if (trn == NULL) { + return NULL; + } trn->inp = inp; trn->out = out; @@ -897,7 +938,9 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) { void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); } void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { FstLastTransition* trn = unNode->last; - if (trn == NULL) { return; } + if (trn == NULL) { + return; + } FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr}; taosArrayPush(unNode->node->trans, &t); fstLastTransitionDestroy(trn); @@ -906,27 +949,35 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, Comp } void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished* unNode, Output out) { - if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) { unNode->node->finalOutput += out; } + if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) { + unNode->node->finalOutput += out; + } size_t sz = taosArrayGetSize(unNode->node->trans); for (size_t i = 0; i < sz; i++) { FstTransition* trn = taosArrayGet(unNode->node->trans, i); trn->out += out; } - if (unNode->last) { unNode->last->out += out; } + if (unNode->last) { + unNode->last->out += out; + } return; } Fst* fstCreate(FstSlice* slice) { int32_t slen; char* buf = fstSliceData(slice, &slen); - if (slen < 36) { return NULL; } + if (slen < 36) { + return NULL; + } uint64_t len = slen; uint64_t skip = 0; uint64_t version; taosDecodeFixedU64(buf, &version); skip += sizeof(version); - if (version == 0 || version > VERSION) { return NULL; } + if (version == 0 || version > VERSION) { + return NULL; + } uint64_t type; taosDecodeFixedU64(buf + skip, &type); @@ -949,10 +1000,14 @@ Fst* fstCreate(FstSlice* slice) { taosDecodeFixedU64(buf + len, &fstLen); // TODO(validate root addr) Fst* fst = (Fst*)calloc(1, sizeof(Fst)); - if (fst == NULL) { return NULL; } + if (fst == NULL) { + return NULL; + } fst->meta = (FstMeta*)malloc(sizeof(FstMeta)); - if (NULL == fst->meta) { goto FST_CREAT_FAILED; } + if (NULL == fst->meta) { + goto FST_CREAT_FAILED; + } fst->meta->version = version; fst->meta->rootAddr = rootAddr; @@ -983,7 +1038,7 @@ void fstDestroy(Fst* fst) { bool fstGet(Fst* fst, FstSlice* b, Output* out) { // dec lock range - pthread_mutex_lock(&fst->mtx); + // pthread_mutex_lock(&fst->mtx); FstNode* root = fstGetRoot(fst); Output tOut = 0; int32_t len; @@ -996,7 +1051,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { uint8_t inp = data[i]; Output res = 0; if (false == fstNodeFindInput(root, inp, &res)) { - pthread_mutex_unlock(&fst->mtx); + // pthread_mutex_unlock(&fst->mtx); return false; } @@ -1007,7 +1062,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { taosArrayPush(nodes, &root); } if (!FST_NODE_IS_FINAL(root)) { - pthread_mutex_unlock(&fst->mtx); + // pthread_mutex_unlock(&fst->mtx); return false; } else { tOut = tOut + FST_NODE_FINAL_OUTPUT(root); @@ -1018,8 +1073,8 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { fstNodeDestroy(*node); } taosArrayDestroy(nodes); - fst->root = NULL; - pthread_mutex_unlock(&fst->mtx); + // fst->root = NULL; + // pthread_mutex_unlock(&fst->mtx); *out = tOut; return true; } @@ -1028,7 +1083,9 @@ FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) { return fstStreamBuilderCreate(fst, ctx); } StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { - if (sb == NULL) { return NULL; } + if (sb == NULL) { + return NULL; + } return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); } FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { @@ -1039,15 +1096,6 @@ FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { FstNode* fstGetRoot(Fst* fst) { CompiledAddr rAddr = fstGetRootAddr(fst); return fstGetNode(fst, rAddr); - // pthread_mutex_lock(&fst->mtx); - // if (fst->root != NULL) { - // // pthread_mutex_unlock(&fst->mtx); - // return fst->root; - //} - // CompiledAddr rAddr = fstGetRootAddr(fst); - // fst->root = fstGetNode(fst, rAddr); - //// pthread_mutex_unlock(&fst->mtx); - // return fst->root; } FstNode* fstGetNode(Fst* fst, CompiledAddr addr) { @@ -1074,14 +1122,18 @@ bool fstVerify(Fst* fst) { uint32_t len, checkSum = fst->meta->checkSum; uint8_t* data = fstSliceData(fst->data, &len); TSCKSUM initSum = 0; - if (!taosCheckChecksumWhole(data, len)) { return false; } + if (!taosCheckChecksumWhole(data, len)) { + return false; + } return true; } // data bound function FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice* data) { FstBoundWithData* b = calloc(1, sizeof(FstBoundWithData)); - if (b == NULL) { return NULL; } + if (b == NULL) { + return NULL; + } if (data != NULL) { b->data = fstSliceCopy(data, data->start, data->end); @@ -1118,7 +1170,9 @@ void fstBoundDestroy(FstBoundWithData* bound) { free(bound); } StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) { StreamWithState* sws = calloc(1, sizeof(StreamWithState)); - if (sws == NULL) { return NULL; } + if (sws == NULL) { + return NULL; + } sws->fst = fst; sws->aut = automation; @@ -1134,7 +1188,9 @@ StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstB return sws; } void streamWithStateDestroy(StreamWithState* sws) { - if (sws == NULL) { return; } + if (sws == NULL) { + return; + } taosArrayDestroy(sws->inp); taosArrayDestroyEx(sws->stack, streamStateDestroy); @@ -1200,7 +1256,9 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { uint64_t i = 0; for (i = trans->range.start; i < trans->range.end; i++) { FstTransition trn; - if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { break; } + if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { + break; + } } StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; @@ -1248,7 +1306,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb while (taosArrayGetSize(sws->stack) > 0) { StreamState* p = (StreamState*)taosArrayPop(sws->stack); if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) { - if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } + if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { + taosArrayPop(sws->inp); + } streamStateDestroy(p); continue; } @@ -1267,7 +1327,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb if (FST_NODE_IS_FINAL(nextNode)) { // void *eofState = sws->aut->acceptEof(nextState); void* eofState = automFuncs[aut->type].acceptEof(aut, nextState); - if (eofState != NULL) { isMatch = automFuncs[aut->type].isMatch(aut, eofState); } + if (eofState != NULL) { + isMatch = automFuncs[aut->type].isMatch(aut, eofState); + } } StreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; taosArrayPush(sws->stack, &s1); @@ -1277,24 +1339,26 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb size_t isz = taosArrayGetSize(sws->inp); uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t)); - for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); } + for (uint32_t i = 0; i < isz; i++) { + buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); + } FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState)); - free(buf); + tfree(buf); fstSliceDestroy(&slice); return NULL; } if (FST_NODE_IS_FINAL(nextNode) && isMatch) { FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState); - free(buf); + tfree(buf); fstSliceDestroy(&slice); taosArrayDestroy(nodes); return result; } - free(buf); + tfree(buf); fstSliceDestroy(&slice); } for (size_t i = 0; i < taosArrayGetSize(nodes); i++) { @@ -1307,16 +1371,19 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* state) { StreamWithStateResult* result = calloc(1, sizeof(StreamWithStateResult)); - if (result == NULL) { return NULL; } + if (result == NULL) { + return NULL; + } result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); result->out = fOut; result->state = state; - return result; } void swsResultDestroy(StreamWithStateResult* result) { - if (NULL == result) { return; } + if (NULL == result) { + return; + } fstSliceDestroy(&result->data); startWithStateValueDestroy(result->state); @@ -1324,16 +1391,18 @@ void swsResultDestroy(StreamWithStateResult* result) { } void streamStateDestroy(void* s) { - if (NULL == s) { return; } + if (NULL == s) { + return; + } StreamState* ss = (StreamState*)s; - fstNodeDestroy(ss->node); - // free(s->autoState); } FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) { FstStreamBuilder* b = calloc(1, sizeof(FstStreamBuilder)); - if (NULL == b) { return NULL; } + if (NULL == b) { + return NULL; + } b->fst = fst; b->aut = aut; @@ -1349,8 +1418,9 @@ void fstStreamBuilderDestroy(FstStreamBuilder* b) { free(b); } FstStreamBuilder* fstStreamBuilderRange(FstStreamBuilder* b, FstSlice* val, RangeType type) { - if (b == NULL) { return NULL; } - + if (b == NULL) { + return NULL; + } if (type == GE) { b->min->type = Included; fstSliceDestroy(&(b->min->data)); diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index c6e3cee3e3..590ff294bf 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -17,7 +17,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { return NULL; } + if (nsv == NULL) { + return NULL; + } nsv->kind = kind; nsv->type = ty; @@ -35,7 +37,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp } void startWithStateValueDestroy(void* val) { StartWithStateValue* sv = (StartWithStateValue*)val; - if (sv == NULL) { return; } + if (sv == NULL) { + return; + } if (sv->type == FST_INT) { // @@ -48,7 +52,9 @@ void startWithStateValueDestroy(void* val) { } StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { return NULL; } + if (nsv == NULL) { + return NULL; + } nsv->kind = sv->kind; nsv->type = sv->type; @@ -88,10 +94,14 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) { static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; } static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { StartWithStateValue* ssv = (StartWithStateValue*)state; - if (ssv == NULL || ctx == NULL) { return NULL; } + if (ssv == NULL || ctx == NULL) { + return NULL; + } char* data = ctx->data; - if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); } + if (ssv->kind == Done) { + return startWithStateValueCreate(Done, FST_INT, &ssv->val); + } if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { int val = ssv->val + 1; @@ -128,7 +138,9 @@ AutomationFunc automFuncs[] = { AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); - if (ctx == NULL) { return NULL; } + if (ctx == NULL) { + return NULL; + } StartWithStateValue* sv = NULL; if (atype == AUTOMATION_ALWAYS) { diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index da1e177a18..f08a48c34e 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -29,18 +29,6 @@ const uint64_t VERSION = 3; const uint64_t TRANS_INDEX_THRESHOLD = 32; -// uint8_t commonInput(uint8_t idx) { -// if (idx == 0) { return -1; } -// else { -// return COMMON_INPUTS_INV[idx - 1]; -// } -//} -// -// uint8_t commonIdx(uint8_t v, uint8_t max) { -// uint8_t v = ((uint16_t)tCOMMON_INPUTS[v] + 1)%256; -// return v > max ? 0: v; -//} - uint8_t packSize(uint64_t n) { if (n < (1u << 8)) { return 1; @@ -103,9 +91,6 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) { FstSlice fstSliceCopy(FstSlice* s, int32_t start, int32_t end) { FstString* str = s->str; str->ref++; - // uint8_t *buf = fstSliceData(s, &alen); - // start = buf + start - (buf - s->start); - // end = buf + end - (buf - s->start); FstSlice t = {.str = str, .start = start + s->start, .end = end + s->start}; return t; @@ -130,19 +115,19 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) { ans.end = tlen - 1; return ans; } -bool fstSliceIsEmpty(FstSlice* s) { - return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0; -} +bool fstSliceIsEmpty(FstSlice* s) { return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0; } uint8_t* fstSliceData(FstSlice* s, int32_t* size) { FstString* str = s->str; - if (size != NULL) { *size = s->end - s->start + 1; } + if (size != NULL) { + *size = s->end - s->start + 1; + } return str->data + s->start; } void fstSliceDestroy(FstSlice* s) { FstString* str = s->str; str->ref--; - if (str->ref <= 0) { + if (str->ref == 0) { free(str->data); free(str); s->str = NULL; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 98fede4f7b..87cc48146b 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -13,8 +13,6 @@ p * * along with this program. If not, see . */ -//#include -//#include #include "index_tfile.h" #include "index.h" #include "index_fst.h" @@ -61,7 +59,9 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); - if (tcache == NULL) { return NULL; } + if (tcache == NULL) { + return NULL; + } tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->capacity = 64; @@ -98,7 +98,9 @@ End: return NULL; } void tfileCacheDestroy(TFileCache* tcache) { - if (tcache == NULL) { return; } + if (tcache == NULL) { + return; + } // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); @@ -119,7 +121,9 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { int32_t sz = indexSerialCacheKey(key, buf); assert(sz < sizeof(buf)); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); - if (reader == NULL) { return NULL; } + if (reader == NULL) { + return NULL; + } tfileReaderRef(*reader); return *reader; @@ -142,7 +146,9 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { } TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); - if (reader == NULL) { return NULL; } + if (reader == NULL) { + return NULL; + } reader->ctx = ctx; @@ -169,7 +175,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { return reader; } void tfileReaderDestroy(TFileReader* reader) { - if (reader == NULL) { return; } + if (reader == NULL) { + return; + } // T_REF_INC(reader); fstDestroy(reader->fst); writerCtxDestroy(reader->ctx, reader->remove); @@ -209,7 +217,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c tfileGenFileFullName(fullname, path, suid, colName, version); // indexInfo("open write file name %s", fullname); WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64); - if (wcx == NULL) { return NULL; } + if (wcx == NULL) { + return NULL; + } TFileHeader tfh = {0}; tfh.suid = suid; @@ -225,7 +235,9 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); - if (wc == NULL) { return NULL; } + if (wc == NULL) { + return NULL; + } TFileReader* reader = tfileReaderCreate(wc); return reader; @@ -316,19 +328,25 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { return 0; } void tfileWriterClose(TFileWriter* tw) { - if (tw == NULL) { return; } + if (tw == NULL) { + return; + } writerCtxDestroy(tw->ctx, false); free(tw); } void tfileWriterDestroy(TFileWriter* tw) { - if (tw == NULL) { return; } + if (tw == NULL) { + return; + } writerCtxDestroy(tw->ctx, false); free(tw); } IndexTFile* indexTFileCreate(const char* path) { TFileCache* cache = tfileCacheCreate(path); - if (cache == NULL) { return NULL; } + if (cache == NULL) { + return NULL; + } IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); if (tfile == NULL) { @@ -340,21 +358,27 @@ IndexTFile* indexTFileCreate(const char* path) { return tfile; } void indexTFileDestroy(IndexTFile* tfile) { - if (tfile == NULL) { return; } + if (tfile == NULL) { + return; + } tfileCacheDestroy(tfile->cache); free(tfile); } int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; - if (tfile == NULL) { return ret; } + if (tfile == NULL) { + return ret; + } IndexTFile* pTfile = (IndexTFile*)tfile; SIndexTerm* term = query->term; ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; TFileReader* reader = tfileCacheGet(pTfile->cache, &key); - if (reader == NULL) { return 0; } + if (reader == NULL) { + return 0; + } return tfileReaderSearch(reader, query, result); } @@ -373,7 +397,9 @@ static bool tfileIteratorNext(Iterate* iiter) { TFileFstIter* tIter = iiter->iter; StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); - if (rt == NULL) { return false; } + if (rt == NULL) { + return false; + } int32_t sz = 0; char* ch = (char*)fstSliceData(&rt->data, &sz); @@ -383,7 +409,9 @@ static bool tfileIteratorNext(Iterate* iiter) { offset = (uint64_t)(rt->out.out); swsResultDestroy(rt); // set up iterate value - if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } + if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { + return false; + } iv->colVal = colVal; return true; @@ -394,7 +422,9 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter)); - if (tIter == NULL) { return NULL; } + if (tIter == NULL) { + return NULL; + } tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); tIter->fb = fstSearch(reader->fst, tIter->ctx); @@ -404,7 +434,9 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { } Iterate* tfileIteratorCreate(TFileReader* reader) { - if (reader == NULL) { return NULL; } + if (reader == NULL) { + return NULL; + } Iterate* iter = calloc(1, sizeof(Iterate)); iter->iter = tfileFstIteratorCreate(reader); @@ -419,7 +451,9 @@ Iterate* tfileIteratorCreate(TFileReader* reader) { return iter; } void tfileIteratorDestroy(Iterate* iter) { - if (iter == NULL) { return; } + if (iter == NULL) { + return; + } IterateValue* iv = &iter->val; iterateValueDestroy(iv, true); @@ -434,7 +468,9 @@ void tfileIteratorDestroy(Iterate* iter) { } TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { - if (tf == NULL) { return NULL; } + if (tf == NULL) { + return NULL; + } ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; return tfileCacheGet(tf->cache, &key); } @@ -446,7 +482,9 @@ static int tfileUidCompare(const void* a, const void* b) { } static int tfileStrCompare(const void* a, const void* b) { int ret = strcmp((char*)a, (char*)b); - if (ret == 0) { return ret; } + if (ret == 0) { + return ret; + } return ret < 0 ? -1 : 1; } @@ -461,13 +499,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { TFileValue* tfileValueCreate(char* val) { TFileValue* tf = calloc(1, sizeof(TFileValue)); - if (tf == NULL) { return NULL; } + if (tf == NULL) { + return NULL; + } tf->colVal = tstrdup(val); tf->tableId = taosArrayInit(32, sizeof(uint64_t)); return tf; } int tfileValuePush(TFileValue* tf, uint64_t val) { - if (tf == NULL) { return -1; } + if (tf == NULL) { + return -1; + } taosArrayPush(tf->tableId, &val); return 0; } @@ -489,7 +531,9 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { int32_t fstOffset = offset + sizeof(tw->header.fstOffset); tw->header.fstOffset = fstOffset; - if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { + return -1; + } indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx)); tw->offset += sizeof(fstOffset); return 0; @@ -502,7 +546,9 @@ static int tfileWriteHeader(TFileWriter* writer) { indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); - if (sizeof(buf) != nwrite) { return -1; } + if (sizeof(buf) != nwrite) { + return -1; + } indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx)); writer->offset = nwrite; @@ -556,7 +602,9 @@ static int tfileReaderLoadFst(TFileReader* reader) { static int FST_MAX_SIZE = 64 * 1024 * 1024; char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); - if (buf == NULL) { return -1; } + if (buf == NULL) { + return -1; + } WriterCtx* ctx = reader->ctx; int size = ctx->size(ctx); @@ -586,12 +634,16 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* int32_t total = sizeof(uint64_t) * nid; char* buf = calloc(1, total); - if (buf == NULL) { return -1; } + if (buf == NULL) { + return -1; + } nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid)); assert(total == nread); - for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); } + for (int32_t i = 0; i < nid; i++) { + taosArrayPush(result, (uint64_t*)buf + i); + } free(buf); return 0; } @@ -615,13 +667,17 @@ static int tfileReaderVerify(TFileReader* reader) { } void tfileReaderRef(TFileReader* reader) { - if (reader == NULL) { return; } + if (reader == NULL) { + return; + } int ref = T_REF_INC(reader); UNUSED(ref); } void tfileReaderUnRef(TFileReader* reader) { - if (reader == NULL) { return; } + if (reader == NULL) { + return; + } int ref = T_REF_DEC(reader); if (ref == 0) { // do nothing @@ -637,11 +693,15 @@ static SArray* tfileGetFileList(const char* path) { uint32_t version; DIR* dir = opendir(path); - if (NULL == dir) { return NULL; } + if (NULL == dir) { + return NULL; + } struct dirent* entry; while ((entry = readdir(dir)) != NULL) { char* file = entry->d_name; - if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; } + if (0 != tfileParseFileName(file, &suid, buf, &version)) { + continue; + } size_t len = strlen(path) + 1 + strlen(file) + 1; char* buf = calloc(1, len); diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 3957554748..665dfd7318 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,5 +1,7 @@ add_executable(indexTest "") add_executable(fstTest "") +add_executable(fstUT "") + target_sources(indexTest PRIVATE "indexTests.cc" @@ -8,6 +10,11 @@ target_sources(fstTest PRIVATE "fstTest.cc" ) + +target_sources(fstUT + PRIVATE + "fstUT.cc" +) target_include_directories ( indexTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" @@ -18,6 +25,12 @@ target_include_directories ( fstTest "${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) + +target_include_directories ( fstUT + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries (indexTest os util @@ -32,6 +45,13 @@ target_link_libraries (fstTest gtest_main index ) +target_link_libraries (fstUT + os + util + common + gtest_main + index +) #add_test( diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index a2c0046f9a..65118a2bce 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -58,7 +58,9 @@ class FstReadMemory { bool init() { char* buf = (char*)calloc(1, sizeof(char) * _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); - if (nRead <= 0) { return false; } + if (nRead <= 0) { + return false; + } _size = nRead; _s = fstSliceCreate((uint8_t*)buf, _size); _fst = fstCreate(&_s); @@ -97,7 +99,8 @@ class FstReadMemory { printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out)); swsResultDestroy(rt); } - for (size_t i = 0; i < result.size(); i++) {} + for (size_t i = 0; i < result.size(); i++) { + } std::cout << std::endl; return true; } @@ -173,7 +176,9 @@ void checkMillonWriteAndReadOfFst() { delete fw; FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024); - if (fr->init()) { printf("success to init fst read"); } + if (fr->init()) { + printf("success to init fst read"); + } Performance_fstReadRecords(fr); tfCleanup(); diff --git a/source/libs/index/test/fstUT.cc b/source/libs/index/test/fstUT.cc new file mode 100644 index 0000000000..47215693bb --- /dev/null +++ b/source/libs/index/test/fstUT.cc @@ -0,0 +1,232 @@ + +#include +#include +#include +#include +#include +#include +#include "index.h" +#include "indexInt.h" +#include "index_cache.h" +#include "index_fst.h" +#include "index_fst_counting_writer.h" +#include "index_fst_util.h" +#include "index_tfile.h" +#include "tglobal.h" +#include "tskiplist.h" +#include "tutil.h" +#include "ulog.h" + +static std::string dir = "/tmp/index"; + +static char indexlog[PATH_MAX] = {0}; +static char tindex[PATH_MAX] = {0}; +static char tindexDir[PATH_MAX] = {0}; + +static void EnvInit() { + tfInit(); + + std::string path = dir; + taosRemoveDir(path.c_str()); + taosMkDir(path.c_str()); + // init log file + snprintf(indexlog, PATH_MAX, "%s/tindex.idx", path.c_str()); + if (taosInitLog(indexlog, tsNumOfLogLines, 1) != 0) { + printf("failed to init log"); + } + // init index file + memset(tindex, 0, sizeof(tindex)); + snprintf(tindex, PATH_MAX, "%s/tindex.idx", path.c_str()); +} +static void EnvCleanup() {} +class FstWriter { + public: + FstWriter() { + _wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024); + _b = fstBuilderCreate(_wc, 0); + } + bool Put(const std::string& key, uint64_t val) { + // char buf[128] = {0}; + // int len = 0; + // taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len); + // FstSlice skey = fstSliceCreate((uint8_t*)buf, len); + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstBuilderInsert(_b, skey, val); + + fstSliceDestroy(&skey); + return ok; + } + ~FstWriter() { + fstBuilderFinish(_b); + fstBuilderDestroy(_b); + + writerCtxDestroy(_wc, false); + } + + private: + FstBuilder* _b; + WriterCtx* _wc; +}; + +class FstReadMemory { + public: + FstReadMemory(size_t size) { + _wc = writerCtxCreate(TFile, tindex, true, 64 * 1024); + _w = fstCountingWriterCreate(_wc); + _size = size; + memset((void*)&_s, 0, sizeof(_s)); + } + bool init() { + char* buf = (char*)calloc(1, sizeof(char) * _size); + int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); + if (nRead <= 0) { + return false; + } + _size = nRead; + _s = fstSliceCreate((uint8_t*)buf, _size); + _fst = fstCreate(&_s); + free(buf); + return _fst != NULL; + } + bool Get(const std::string& key, uint64_t* val) { + // char buf[128] = {0}; + // int len = 0; + // taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len); + // FstSlice skey = fstSliceCreate((uint8_t*)buf, len); + + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstGet(_fst, &skey, val); + fstSliceDestroy(&skey); + return ok; + } + bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Get(key, val); + int64_t e = taosGetTimestampUs(); + *elapse = e - s; + return ok; + } + // add later + bool Search(AutomationCtx* ctx, std::vector& result) { + FstStreamBuilder* sb = fstSearch(_fst, ctx); + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + // result.push_back((uint64_t)(rt->out.out)); + FstSlice* s = &rt->data; + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + std::string key(ch, sz); + printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out)); + swsResultDestroy(rt); + } + for (size_t i = 0; i < result.size(); i++) { + } + std::cout << std::endl; + return true; + } + bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector& result) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Search(ctx, result); + int64_t e = taosGetTimestampUs(); + return ok; + } + + ~FstReadMemory() { + fstCountingWriterDestroy(_w); + fstDestroy(_fst); + fstSliceDestroy(&_s); + writerCtxDestroy(_wc, false); + tfCleanup(); + } + + private: + FstCountingWriter* _w; + Fst* _fst; + FstSlice _s; + WriterCtx* _wc; + size_t _size; +}; + +class FstWriterEnv : public ::testing::Test { + protected: + virtual void SetUp() { fw = new FstWriter(); } + virtual void TearDown() { delete fw; } + FstWriter* fw = NULL; +}; + +class FstReadEnv : public ::testing::Test { + protected: + virtual void SetUp() { fr = new FstReadMemory(1024); } + virtual void TearDown() { delete fr; } + FstReadMemory* fr = NULL; +}; + +class TFst { + public: + void CreateWriter() { fw = new FstWriter; } + void ReCreateWriter() { + if (fw != NULL) delete fw; + fw = new FstWriter; + } + void DestroyWriter() { + if (fw != NULL) delete fw; + } + void CreateReader() { + fr = new FstReadMemory(1024); + fr->init(); + } + void ReCreateReader() { + if (fr != NULL) delete fr; + fr = new FstReadMemory(1024); + } + void DestroyReader() { + delete fr; + fr = NULL; + } + bool Put(const std::string& k, uint64_t v) { + if (fw == NULL) { + return false; + } + return fw->Put(k, v); + } + bool Get(const std::string& k, uint64_t* v) { + if (fr == NULL) { + return false; + } + return fr->Get(k, v); + } + + private: + FstWriter* fw; + FstReadMemory* fr; +}; +class FstEnv : public ::testing::Test { + protected: + virtual void SetUp() { + EnvInit(); + fst = new TFst; + } + virtual void TearDown() { delete fst; } + TFst* fst; +}; + +TEST_F(FstEnv, writeNormal) { + fst->CreateWriter(); + std::string str("aa"); + for (int i = 0; i < 10; i++) { + str[0] = 'a' + i; + str.resize(2); + assert(fst->Put(str, i) == true); + } + // order failed + assert(fst->Put("aa", 1) == false); + + fst->DestroyWriter(); + + fst->CreateReader(); + uint64_t val; + assert(fst->Get("a", &val) == false); + assert(fst->Get("aa", &val) == true); + assert(val == 0); +} -- GitLab