diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 716c9c23693d062ba902e4fd2fa48faa402394a8..5079fc6c063cdc348682369ece69b90314389ee3 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -96,8 +96,10 @@ typedef struct SIndexTermQuery { typedef struct Iterate Iterate; typedef struct IterateValue { - int8_t type; // opera type, ADD_VALUE/DELETE_VALUE - char* colVal; + int8_t type; // opera type, ADD_VALUE/DELETE_VALUE + uint64_t ver; // data ver, tfile data version is 0 + char* colVal; + SArray* val; } IterateValue; diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 1445a1bc562cde1cf51fa3e6cb5df057eaa3cc88..a6ebcd6d6ffbf700c482e8094754ebe1c7e54742 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -16,6 +16,7 @@ #define __INDEX_CACHE_H__ #include "indexInt.h" +#include "index_util.h" #include "tskiplist.h" // ----------------- key structure in skiplist --------------------- @@ -52,8 +53,9 @@ typedef struct CacheTerm { char* colVal; int32_t version; // value - uint64_t uid; - int8_t colType; + uint64_t uid; + int8_t colType; + SIndexOperOnColumn operaType; } CacheTerm; // @@ -68,7 +70,7 @@ void indexCacheIteratorDestroy(Iterate* iiter); int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s); +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s); void indexCacheRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index f676651e52262d80cd0c7c2f42ec0fa6c258e43b..3794898d3ad3cf3296a4aa44f79f12365c59c519 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -19,6 +19,7 @@ #include "index_fst.h" #include "index_fst_counting_writer.h" #include "index_tfile.h" +#include "index_util.h" #include "tlockfree.h" #ifdef __cplusplus @@ -103,7 +104,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); @@ -118,7 +119,7 @@ int tfileWriterFinish(TFileWriter* tw); IndexTFile* indexTFileCreate(const char* path); void indexTFileDestroy(IndexTFile* tfile); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr); Iterate* tfileIteratorCreate(TFileReader* reader); void tfileIteratorDestroy(Iterate* iterator); diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 313839bf1da56bc40fa9cabb86cca9d28b6af89f..814d61afd737a2e455ff4670ee9192fe54ed3ec1 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -47,6 +47,19 @@ extern "C" { buf += len; \ } while (0) +#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ + { \ + bool f = false; \ + for (int i = 0; i < taosArrayGetSize(src); i++) { \ + if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \ + f = true; \ + } \ + } \ + if (f == false) { \ + taosArrayPush(dst, &tgt); \ + } \ + } + /* multi sorted result intersection * input: [1, 2, 4, 5] * [2, 3, 4, 5] @@ -66,10 +79,32 @@ void iUnion(SArray *interResults, SArray *finalResult); /* sorted array * total: [1, 2, 4, 5, 7, 8] * except: [4, 5] - * return: [1, 2, 7, 8] + * return: [1, 2, 7, 8] saved in total */ void iExcept(SArray *total, SArray *except); + +int uidCompare(const void *a, const void *b); + +// data with ver +typedef struct { + uint32_t ver; + uint64_t data; +} SIdxVerdata; + +typedef struct { + SArray *total; + SArray *added; + SArray *deled; +} SIdxTempResult; + +SIdxTempResult *sIdxTempResultCreate(); + +void sIdxTempResultClear(SIdxTempResult *tr); + +void sIdxTempResultDestroy(SIdxTempResult *tr); + +void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 75dc05fc5b8cfdf254eebf9383723f82a3507bed..ae0a6c775e413cb95198b246fb5b74e3da41aca0 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -31,18 +31,6 @@ void* indexQhandle = NULL; -#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ - { \ - bool f = false; \ - for (int i = 0; i < taosArrayGetSize(src); i++) { \ - if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \ - f = true; \ - } \ - } \ - if (f == false) { \ - taosArrayPush(dst, &tgt); \ - } \ - } void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -52,23 +40,11 @@ void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } -static int uidCompare(const void* a, const void* b) { - // add more version compare - uint64_t u1 = *(uint64_t*)a; - uint64_t u2 = *(uint64_t*)b; - return u1 - u2; -} typedef struct SIdxColInfo { int colId; // generated by index internal int cVersion; } SIdxColInfo; -typedef struct SIdxTempResult { - SArray* total; - SArray* added; - SArray* deled; -} SIdxTempResult; - static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); @@ -255,6 +231,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { #ifdef USE_INVERTED_INDEX + #endif return 1; @@ -363,22 +340,30 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result *result = taosArrayInit(4, sizeof(uint64_t)); // TODO: iterator mem and tidex STermValueType s = kTypeValue; - if (0 == indexCacheSearch(cache, query, *result, &s)) { + + SIdxTempResult* tr = sIdxTempResultCreate(); + if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by", term->colName); // coloum already drop by other oper, no need to query tindex return 0; } else { - if (0 != indexTFileSearch(sIdx->tindex, query, *result)) { + if (0 != indexTFileSearch(sIdx->tindex, query, tr)) { indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); - return -1; + goto END; } } } else { indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); - return -1; + goto END; } + + sIdxTempResultMergeTo(*result, tr); + sIdxTempResultDestroy(tr); return 0; +END: + sIdxTempResultDestroy(tr); + return -1; } static void indexInterResultsDestroy(SArray* results) { if (results == NULL) { @@ -413,43 +398,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -SIdxTempResult* sIdxTempResultCreate() { - SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); - tr->total = taosArrayInit(4, sizeof(uint64_t)); - tr->added = taosArrayInit(4, sizeof(uint64_t)); - tr->deled = taosArrayInit(4, sizeof(uint64_t)); - return tr; -} -void sIdxTempResultClear(SIdxTempResult* tr) { - if (tr == NULL) { - return; - } - taosArrayClear(tr->total); - taosArrayClear(tr->added); - taosArrayClear(tr->deled); -} -void sIdxTempResultDestroy(SIdxTempResult* tr) { - if (tr == NULL) { - return; - } - taosArrayDestroy(tr->total); - taosArrayDestroy(tr->added); - taosArrayDestroy(tr->deled); -} -static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { - taosArraySort(tr->total, uidCompare); - taosArraySort(tr->added, uidCompare); - taosArraySort(tr->deled, uidCompare); - - SArray* arrs = taosArrayInit(2, sizeof(void*)); - taosArrayPush(arrs, &tr->total); - taosArrayPush(arrs, &tr->added); - - iUnion(arrs, result); - taosArrayDestroy(arrs); - - iExcept(result, tr->deled); -} static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { @@ -478,6 +426,7 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); + uint32_t ver = cv->ver; if (cv->type == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 1ac72e10a9bdb207b551578e0c34d367adde7fe1..d3b25afdbcd7d198d738eb2a5bce2ba6b43a2e4b 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -256,7 +256,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u return 0; } -static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { +static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) { if (mem == NULL) { return 0; } @@ -267,28 +267,23 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - // if (c->operaType == ADD_VALUE) { - //} else if (c->operaType == DEL_VALUE) { - //} - - if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { - if (strcmp(c->colVal, ct->colVal) == 0) { - taosArrayPush(result, &c->uid); - *s = kTypeValue; - } else { - break; + if (qtype == QUERY_TERM) { + if (0 == strcmp(c->colVal, ct->colVal)) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } } - } else if (c->operaType == DEL_VALUE) { - // table is del, not need - *s = kTypeDeletion; - break; } } } tSkipListDestroyIter(iter); return 0; } -int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { if (cache == NULL) { return 0; } @@ -416,6 +411,7 @@ static bool indexCacheIteratorNext(Iterate* itera) { CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); iv->type = ct->operaType; + iv->ver = ct->version; iv->colVal = tstrdup(ct->colVal); taosArrayPush(iv->val, &ct->uid); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index f5f46b061714fa372a2aa52607598cf69214990e..fd267fbf03d5742ab838581e735e7ba19f1a6254 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -184,12 +184,13 @@ void tfileReaderDestroy(TFileReader* reader) { free(reader); } -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { SIndexTerm* term = query->term; bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; - int ret = -1; + SArray* result = taosArrayInit(16, sizeof(uint64_t)); + int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; @@ -223,6 +224,10 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul // handle later } tfileReaderUnRef(reader); + + taosArrayAddAll(tr->total, result); + taosArrayDestroy(result); + return ret; } @@ -248,7 +253,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c tfileGenFileFullName(fullname, path, suid, colName, version); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); - indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); + indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); if (wc == NULL) { return NULL; } @@ -380,7 +385,7 @@ void indexTFileDestroy(IndexTFile* tfile) { free(tfile); } -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) { int ret = -1; if (tfile == NULL) { return ret; @@ -428,6 +433,7 @@ static bool tfileIteratorNext(Iterate* iiter) { return false; } + iv->ver = 0; iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->colVal = colVal; return true; @@ -628,7 +634,7 @@ static int tfileReaderLoadFst(TFileReader* reader) { int64_t ts = taosGetTimestampUs(); int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset); int64_t cost = taosGetTimestampUs() - ts; - indexInfo("nread = %d, and fst offset=%d, size: %d, filename: %s, size: %d, time cost: %" PRId64 "us", nread, + indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread <= fstSize); diff --git a/source/libs/index/src/index_util.c b/source/libs/index/src/index_util.c index fc28484de9dd80fe9a609e457785bc4f035dabd1..dfe4e273a9e09255d2016b874e76ed108fc011c4 100644 --- a/source/libs/index/src/index_util.c +++ b/source/libs/index/src/index_util.c @@ -14,6 +14,7 @@ */ #include "index_util.h" #include "index.h" +#include "tcompare.h" typedef struct MergeIndex { int idx; @@ -135,3 +136,60 @@ void iExcept(SArray *total, SArray *except) { taosArrayPopTailBatch(total, tsz - vIdx); } + +int uidCompare(const void *a, const void *b) { + // add more version compare + uint64_t u1 = *(uint64_t *)a; + uint64_t u2 = *(uint64_t *)b; + return u1 - u2; +} +int verdataCompare(const void *a, const void *b) { + SIdxVerdata *va = (SIdxVerdata *)a; + SIdxVerdata *vb = (SIdxVerdata *)b; + + int32_t cmp = compareUint64Val(&va->data, &vb->data); + if (cmp == 0) { + cmp = 0 - compareUint32Val(&va->ver, &vb->data); + return cmp; + } + return cmp; +} + +SIdxTempResult *sIdxTempResultCreate() { + SIdxTempResult *tr = calloc(1, sizeof(SIdxTempResult)); + + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; +} +void sIdxTempResultClear(SIdxTempResult *tr) { + if (tr == NULL) { + return; + } + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); +} +void sIdxTempResultDestroy(SIdxTempResult *tr) { + if (tr == NULL) { + return; + } + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); +} +void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); + + SArray *arrs = taosArrayInit(2, sizeof(void *)); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); + + iUnion(arrs, result); + taosArrayDestroy(arrs); + + iExcept(result, tr->deled); +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index a50e91b0944870d4c658c5d51ec38028aa0d98a6..3f46a042ae4777f0d53de66df55b3dbf9f42c677 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -24,6 +24,7 @@ #include "index_fst_counting_writer.h" #include "index_fst_util.h" #include "index_tfile.h" +#include "index_util.h" #include "tskiplist.h" #include "tutil.h" using namespace std; @@ -393,7 +394,13 @@ class TFileObj { // // } - return tfileReaderSearch(reader_, query, result); + SIdxTempResult* tr = sIdxTempResultCreate(); + + int ret = tfileReaderSearch(reader_, query, tr); + + sIdxTempResultMergeTo(result, tr); + sIdxTempResultDestroy(tr); + return ret; } ~TFileObj() { if (writer_) { @@ -507,9 +514,13 @@ class CacheObj { indexCacheDebug(cache); } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - int ret = indexCacheSearch(cache, query, result, s); + SIdxTempResult* tr = sIdxTempResultCreate(); + + int ret = indexCacheSearch(cache, query, tr, s); + sIdxTempResultMergeTo(result, tr); + sIdxTempResultDestroy(tr); + if (ret != 0) { - // std::cout << "failed to get from cache:" << ret << std::endl; } return ret; @@ -649,7 +660,7 @@ class IndexObj { indexInit(); } int Init(const std::string& dir) { - // taosRemoveDir(dir.c_str()); + taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); int ret = indexOpen(&opts, dir.c_str(), &idx); if (ret != 0) { @@ -658,6 +669,14 @@ class IndexObj { } return ret; } + void Del(const std::string& colName, const std::string& colVal, uint64_t uid) { + SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + Put(terms, uid); + indexMultiTermDestroy(terms); + } int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world", size_t numOfTable = 100 * 10000) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), @@ -730,6 +749,7 @@ class IndexObj { std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl; } else { + return -1; } int sz = taosArrayGetSize(result); indexMultiTermQueryDestroy(mq); @@ -797,13 +817,9 @@ class IndexObj { class IndexEnv2 : public ::testing::Test { protected: - virtual void SetUp() { - index = new IndexObj(); - } - virtual void TearDown() { - delete index; - } - IndexObj* index; + virtual void SetUp() { index = new IndexObj(); } + virtual void TearDown() { delete index; } + IndexObj* index; }; TEST_F(IndexEnv2, testIndexOpen) { std::string path = "/tmp/test"; @@ -1042,3 +1058,19 @@ TEST_F(IndexEnv2, testIndex_read_performance4) { std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; assert(3 == index->SearchOne("tag10", "Hello")); } +TEST_F(IndexEnv2, testIndex_del) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + for (int i = 0; i < 100; i++) { + index->PutOneTarge("tag10", "Hello", i); + } + index->Del("tag10", "Hello", 12); + index->Del("tag10", "Hello", 11); + + index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); + + EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); + // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + // assert(3 == index->SearchOne("tag10", "Hello")); +}