From 7e3e6022db74290fa243cce37766745d63ac75b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Dec 2021 19:06:26 +0800 Subject: [PATCH] complete index write/search --- source/libs/index/inc/indexInt.h | 17 ++++- source/libs/index/inc/index_cache.h | 6 ++ source/libs/index/inc/index_fst.h | 2 + source/libs/index/inc/index_tfile.h | 10 +++ source/libs/index/src/index.c | 87 +++++++++++++++++++++- source/libs/index/src/index_cache.c | 52 +++++++++++++ source/libs/index/src/index_tfile.c | 111 ++++++++++++++++++++++++++++ 7 files changed, 282 insertions(+), 3 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a8f231da0a..048c9e804e 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -51,6 +51,8 @@ struct SIndex { int64_t suid; // current super table id, -1 is normal table int32_t cVersion; // current version allocated to cache + char* path; + SIndexStat stat; pthread_mutex_t mtx; }; @@ -87,12 +89,23 @@ typedef struct SIndexTermQuery { EIndexQueryType qType; } SIndexTermQuery; -typedef struct Iterate { - void* iter; +typedef struct Iterate Iterate; + +typedef struct IterateValue { int8_t type; char* colVal; SArray* val; +} IterateValue; + +typedef struct Iterate { + void* iter; + IterateValue val; + bool (*next)(Iterate* iter); + IterateValue* (*getValue)(Iterate* iter); } Iterate; + +void iterateValueDestroy(IterateValue* iv, bool destroy); + extern void* indexQhandle; int indexFlushCacheTFile(SIndex* sIdx, void*); diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 07b5b8d564..27f5a6fb20 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -41,6 +41,7 @@ typedef struct IndexCache { } IndexCache; +#define CACHE_VERSION(cache) atomic_load_32(&cache->version) typedef struct CacheTerm { // key int32_t nColVal; @@ -57,6 +58,9 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); void indexCacheDestroy(void* cache); +Iterate* indexCacheIteratorCreate(IndexCache* cache); +void indexCacheIteratorDestroy(Iterate* iiter); + int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); @@ -66,6 +70,8 @@ void indexCacheRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache); void indexCacheDebug(IndexCache* cache); + +void indexCacheDestroySkiplist(SSkipList* slt); #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 3c572787fc..73c79b2619 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -319,6 +319,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min); StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback); FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut); + +void fstStreamBuilderDestroy(FstStreamBuilder* b); // set up bound range // refator, simple code by marco diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 550492ba50..f97a3126c8 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -113,6 +113,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArr void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type); +void tfileWriteClose(TFileWriter* tw); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); void tfileWriterDestroy(TFileWriter* tw); int tfileWriterPut(TFileWriter* tw, void* data); @@ -123,6 +125,14 @@ IndexTFile* indexTFileCreate(const char* path); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); +Iterate* tfileIteratorCreate(TFileReader* reader); +void tfileIteratorDestroy(Iterate* iterator); + +TFileValue* tfileValueCreate(char* val); + +int tfileValuePush(TFileValue* tf, uint64_t val); +void tfileValueDestroy(TFileValue* tf); + #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 3f871af01d..23df5f9f9a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -75,9 +75,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { sIdx->tindex = indexTFileCreate(path); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; + sIdx->path = calloc(1, strlen(path) + 1); + memcpy(sIdx->path, path, strlen(path)); pthread_mutex_init(&sIdx->mtx, NULL); *index = sIdx; + return 0; #endif @@ -361,14 +364,96 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } return 0; } + int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); - IndexCache* pCache = (IndexCache*)cache; + IndexCache* pCache = (IndexCache*)cache; TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + // handle flush + Iterate* cacheIter = indexCacheIteratorCreate(pCache); + Iterate* tfileIter = tfileIteratorCreate(pReader); + + SArray* result = taosArrayInit(1024, sizeof(void*)); + + bool cn = cacheIter->next(cacheIter); + bool tn = tfileIter->next(tfileIter); + while (cn == true && tn == true) { + IterateValue* cv = cacheIter->getValue(cacheIter); + IterateValue* tv = tfileIter->getValue(tfileIter); + + // dump value + int comp = strcmp(cv->colVal, tv->colVal); + if (comp == 0) { + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayAddAll(tfv->tableId, tv->val); + taosArrayPush(result, &tfv); + + cn = cacheIter->next(cacheIter); + tn = tfileIter->next(tfileIter); + continue; + } else if (comp < 0) { + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayPush(result, &tfv); + + // copy to final Result; + cn = cacheIter->next(cacheIter); + } else { + TFileValue* tfv = tfileValueCreate(tv->colVal); + taosArrayPush(result, &tfv); + taosArrayAddAll(tfv->tableId, tv->val); + // copy to final result + tn = tfileIter->next(tfileIter); + } + } + while (cn == true) { + IterateValue* cv = cacheIter->getValue(cacheIter); + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayPush(result, &tfv); + cn = cacheIter->next(cacheIter); + } + while (tn == true) { + IterateValue* tv = tfileIter->getValue(tfileIter); + TFileValue* tfv = tfileValueCreate(tv->colVal); + taosArrayAddAll(tfv->tableId, tv->val); + taosArrayPush(result, &tfv); + tn = tfileIter->next(tfileIter); + } + + int32_t version = CACHE_VERSION(pCache); + uint8_t colType = pCache->type; + + TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType); + if (tw == NULL) { + indexError("faile to open file to write"); + } else { + int ret = tfileWriterPut(tw, result); + if (ret != 0) { indexError("faile to write into tindex "); } + } + // not free later, just put int table cache + SSkipList* timm = (SSkipList*)pCache->imm; + pCache->imm = NULL; // or throw int bg thread + indexCacheDestroySkiplist(timm); + + tfileWriteClose(tw); + indexCacheIteratorDestroy(cacheIter); + tfileIteratorDestroy(tfileIter); + tfileReaderUnRef(pReader); indexCacheUnRef(pCache); return 0; } +void iterateValueDestroy(IterateValue* value, bool destroy) { + if (destroy) { + taosArrayDestroy(value->val); + } else { + taosArrayClear(value->val); + } + free(value->colVal); + value->colVal = NULL; +} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 8181c17505..54aee8858b 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -94,6 +94,16 @@ void indexCacheDebug(IndexCache* cache) { tSkipListDestroyIter(iter); } +void indexCacheDestroySkiplist(SSkipList* slt) { + SSkipListIterator* iter = tSkipListCreateIter(slt); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + if (ct != NULL) {} + } + tSkipListDestroyIter(iter); +} + void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } @@ -108,6 +118,48 @@ static void doMergeWork(SSchedMsg* msg) { SIndex* sidx = (SIndex*)pCache->index; indexFlushCacheTFile(sidx, pCache); } +static bool indexCacheIteratorNext(Iterate* itera) { + SSkipListIterator* iter = itera->iter; + if (iter == NULL) { return false; } + + IterateValue* iv = &itera->val; + iterateValueDestroy(iv, false); + + bool next = tSkipListIterNext(iter); + if (next) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + + iv->type = ct->operaType; + iv->colVal = ct->colVal; + + taosArrayPush(iv->val, &ct->uid); + } + + return next; +} + +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { + return &iter->val; +} +Iterate* indexCacheIteratorCreate(IndexCache* cache) { + Iterate* iiter = calloc(1, sizeof(Iterate)); + if (iiter == NULL) { return NULL; } + + iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); + iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL; + iiter->next = indexCacheIteratorNext; + iiter->getValue = indexCacheIteratorGetValue; + + return iiter; +} +void indexCacheIteratorDestroy(Iterate* iter) { + if (iter == NULL) { return; } + + tSkipListDestroyIter(iter->iter); + iterateValueDestroy(&iter->val, true); + free(iter); +} int indexCacheSchedToMerge(IndexCache* pCache) { SSchedMsg schedMsg = {0}; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 0dfb14cc8d..fc31ff3c29 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -23,6 +23,13 @@ #include "taosdef.h" #include "tcompare.h" +typedef struct TFileFstIter { + FstStreamBuilder* fb; + StreamWithState* st; + AutomationCtx* ctx; + TFileReader* rdr; +} TFileFstIter; + #define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t)) static int tfileStrCompare(const void* a, const void* b); @@ -184,6 +191,23 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul return ret; } +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { + char filename[128] = {0}; + int32_t coldId = 1; + tfileGenFileName(filename, suid, coldId, version); + + char fullname[256] = {0}; + snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); + WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024); + + TFileHeader tfh = {0}; + tfh.suid = suid; + tfh.version = version; + memcpy(tfh.colName, colName, strlen(colName)); + tfh.colType = colType; + + return tfileWriterCreate(wcx, &tfh); +} TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { // char pathBuf[128] = {0}; // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version); @@ -279,6 +303,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) { tw->fb = NULL; return 0; } +void tfileWriteClose(TFileWriter* tw) { + if (tw == NULL) { return; } + writerCtxDestroy(tw->ctx); + free(tw); +} void tfileWriterDestroy(TFileWriter* tw) { if (tw == NULL) { return; } @@ -314,6 +343,71 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { return 0; } +static bool tfileIteratorNext(Iterate* iiter) { + IterateValue* iv = &iiter->val; + iterateValueDestroy(iv, false); + // SArray* tblIds = iv->val; + + char* colVal = NULL; + uint64_t offset = 0; + + TFileFstIter* tIter = iiter->iter; + StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); + if (rt == NULL) { return false; } + + int32_t sz = 0; + char* ch = (char*)fstSliceData(&rt->data, &sz); + colVal = calloc(1, sz + 1); + memcpy(colVal, ch, sz); + + offset = (uint64_t)(rt->out.out); + + swsResultDestroy(rt); + // set up iterate value + if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } + + iv->colVal = colVal; + + // std::string key(ch, sz); +} + +static IterateValue* tifileIterateGetValue(Iterate* iter) { + return &iter->val; +} + +static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { + TFileFstIter* tIter = calloc(1, sizeof(Iterate)); + if (tIter == NULL) { return NULL; } + tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); + tIter->fb = fstSearch(reader->fst, tIter->ctx); + tIter->st = streamBuilderIntoStream(tIter->fb); + tIter->rdr = reader; + return tIter; +} + +Iterate* tfileIteratorCreate(TFileReader* reader) { + Iterate* iter = calloc(1, sizeof(Iterate)); + + iter->iter = tfileFstIteratorCreate(reader); + if (iter->iter == NULL) { return NULL; } + + iter->next = tfileIteratorNext; + iter->getValue = tifileIterateGetValue; + return iter; +} +void tfileIteratorDestroy(Iterate* iter) { + if (iter == NULL) { return; } + IterateValue* iv = &iter->val; + iterateValueDestroy(iv, true); + + TFileFstIter* tIter = iter->iter; + streamWithStateDestroy(tIter->st); + fstStreamBuilderDestroy(tIter->fb); + automCtxDestroy(tIter->ctx); + + free(iter); +} + TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { if (tf == NULL) { return NULL; } TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; @@ -334,6 +428,23 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { return fn(av->colVal, bv->colVal); } + +TFileValue* tfileValueCreate(char* val) { + TFileValue* tf = calloc(1, sizeof(TFileValue)); + if (tf == NULL) { return NULL; } + + tf->tableId = taosArrayInit(32, sizeof(uint64_t)); + return tf; +} +int tfileValuePush(TFileValue* tf, uint64_t val) { + if (tf == NULL) { return -1; } + taosArrayPush(tf->tableId, &val); + return 0; +} +void tfileValueDestroy(TFileValue* tf) { + taosArrayDestroy(tf->tableId); + free(tf); +} static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { int sz = taosArrayGetSize(ids); SERIALIZE_VAR_TO_BUF(buf, sz, int32_t); -- GitLab