diff --git a/include/util/tfile.h b/include/util/tfile.h index af4c19e7d19ebcd6fb9d24f435bb51073cd9836b..b3d141c4434154bf0ac6887aae1dc0e0a12b8efa 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname); int64_t tfClose(int64_t tfd); int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset); int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); @@ -47,4 +48,4 @@ int32_t tfFtruncate(int64_t tfd, int64_t length); } #endif -#endif /*_TD_UTIL_FILE_H*/ +#endif /*_TD_UTIL_FILE_H*/ diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 73c79b2619512a3f769911f86d2ae65fe45af8b6..072ec93e4e648ae55c945f291e0c0d73dc51cbc0 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state); // end_addr uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data); uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes); -uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans); +uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, + uint64_t nTrans); // input uint8_t fstStateInput(FstState* state, FstNode* node); uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i); @@ -255,9 +256,10 @@ typedef struct FstMeta { } FstMeta; typedef struct Fst { - FstMeta* meta; - FstSlice* data; // - FstNode* root; // + FstMeta* meta; + FstSlice* data; // + FstNode* root; // + pthread_mutex_t mtx; } Fst; // refactor simple function @@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta void swsResultDestroy(StreamWithStateResult* result); typedef void* (*StreamCallback)(void*); -StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max); +StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, + FstBoundWithData* max); void streamWithStateDestroy(StreamWithState* sws); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index d8e69b5fdece6eeae9d0ce4809fb2d04f2b91e4c..4928e01a6322ae487c00580d935ba37d9f0c00fd 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -77,6 +77,7 @@ typedef struct TFileReader { Fst* fst; WriterCtx* ctx; TFileHeader header; + bool remove; } TFileReader; typedef struct IndexTFile { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6d64bdbd65858fe42fba06f090d69ad357e1ab36..5167196031eb822b5540fa3c7b224e64ac859b0a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) { #endif #ifdef USE_INVERTED_INDEX - indexCacheDestroy(sIdx->cache); void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; @@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) { taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); #endif + free(sIdx->path); free(sIdx); return; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 882091afb9a64f271b0bad4ada809f13d285a82b..0e46445a00db0a2bcc4c40b03b4b6ea95fa64e52 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define MEM_TERM_LIMIT 200 +#define MEM_TERM_LIMIT 10000 * 10 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + @@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) { while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - if (ct != NULL) {} + if (ct != NULL) { + free(ct->colVal); + free(ct); + } } tSkipListDestroyIter(iter); tSkipListDestroy(slt); @@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; - indexCacheDebug(pCache); + // indexCacheDebug(pCache); int ret = indexQueryMem(mem, &ct, qtype, result, s); if (ret == 0 && *s != kTypeDeletion) { diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 18024fa39110c3a6acb3052e76e84c332ef425bf..04a08dafd2641f7fd1f91d660d8c4ff4013abeb2 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size - FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes); } -uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) { +uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, + uint64_t nTrans) { uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes; return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size @@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i FstSlice* slice = &node->data; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); - uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans - - (i * tSizes) - tSizes; + uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - + node->nTrans - (i * tSizes) - tSizes; uint8_t* data = fstSliceData(slice, NULL); return unpackDelta(data + at, tSizes, node->end); } @@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) { n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->nTrans = nTrans; n->sizes = sz; - n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); + n->finalOutput = + fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); } return n; } @@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) { // b->wrt = NULL; return b->wrt; } -void fstBuilderFinish(FstBuilder* b) { - fstBuilderInsertInner(b); -} +void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); } FstSlice fstNodeAsSlice(FstNode* node) { FstSlice* slice = &node->data; @@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) { return trn; } -void fstLastTransitionDestroy(FstLastTransition* trn) { - free(trn); -} +void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); } void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { FstLastTransition* trn = unNode->last; if (trn == NULL) { return; } @@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) { fst->meta->checkSum = checkSum; FstSlice* s = calloc(1, sizeof(FstSlice)); - *s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice)); + *s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1); fst->data = s; + pthread_mutex_init(&fst->mtx, NULL); return fst; FST_CREAT_FAILED: @@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) { free(fst->meta); fstSliceDestroy(fst->data); free(fst->data); + pthread_mutex_destroy(&fst->mtx); } free(fst); } bool fstGet(Fst* fst, FstSlice* b, Output* out) { + // dec lock range + pthread_mutex_lock(&fst->mtx); FstNode* root = fstGetRoot(fst); Output tOut = 0; int32_t len; + uint8_t* data = fstSliceData(b, &len); SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*)); @@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; - if (false == fstNodeFindInput(root, inp, &res)) { return false; } + if (false == fstNodeFindInput(root, inp, &res)) { + pthread_mutex_unlock(&fst->mtx); + return false; + } FstTransition trn; fstNodeGetTransitionAt(root, res, &trn); @@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { taosArrayPush(nodes, &root); } if (!FST_NODE_IS_FINAL(root)) { + pthread_mutex_unlock(&fst->mtx); return false; } else { tOut = tOut + FST_NODE_FINAL_OUTPUT(root); @@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { fstNodeDestroy(*node); } taosArrayDestroy(nodes); - fst->root = NULL; + pthread_mutex_unlock(&fst->mtx); *out = tOut; - return true; } FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) { + // refactor later return fstStreamBuilderCreate(fst, ctx); } StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { @@ -1021,24 +1028,30 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); } FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { + // refactor later return fstStreamBuilderCreate(fst, ctx); } FstNode* fstGetRoot(Fst* fst) { - if (fst->root != NULL) { return fst->root; } CompiledAddr rAddr = fstGetRootAddr(fst); - fst->root = fstGetNode(fst, rAddr); - return fst->root; + return fstGetNode(fst, rAddr); + // pthread_mutex_lock(&fst->mtx); + // if (fst->root != NULL) { + // // pthread_mutex_unlock(&fst->mtx); + // return fst->root; + //} + // CompiledAddr rAddr = fstGetRootAddr(fst); + // fst->root = fstGetNode(fst, rAddr); + //// pthread_mutex_unlock(&fst->mtx); + // return fst->root; } + FstNode* fstGetNode(Fst* fst, CompiledAddr addr) { + // refactor later return fstNodeCreate(fst->meta->version, addr, fst->data); } -FstType fstGetType(Fst* fst) { - return fst->meta->ty; -} -CompiledAddr fstGetRootAddr(Fst* fst) { - return fst->meta->rootAddr; -} +FstType fstGetType(Fst* fst) { return fst->meta->ty; } +CompiledAddr fstGetRootAddr(Fst* fst) { return fst->meta->rootAddr; } Output fstEmptyFinalOutput(Fst* fst, bool* null) { Output res = 0; @@ -1053,8 +1066,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) { } bool fstVerify(Fst* fst) { - uint32_t checkSum = fst->meta->checkSum; - int32_t len; + uint32_t len, checkSum = fst->meta->checkSum; uint8_t* data = fstSliceData(fst->data, &len); TSCKSUM initSum = 0; if (!taosCheckChecksumWhole(data, len)) { return false; } @@ -1094,15 +1106,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) { } } -bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { - return bound->type == Excluded ? false : true; -} +bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { return bound->type == Excluded ? false : true; } -void fstBoundDestroy(FstBoundWithData* bound) { - free(bound); -} +void fstBoundDestroy(FstBoundWithData* bound) { free(bound); } -StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) { +StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, + FstBoundWithData* max) { StreamWithState* sws = calloc(1, sizeof(StreamWithState)); if (sws == NULL) { return NULL; } @@ -1131,7 +1140,9 @@ void streamWithStateDestroy(StreamWithState* sws) { bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { AutomationCtx* aut = sws->aut; if (fstBoundWithDataIsEmpty(min)) { - if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); } + if (fstBoundWithDataIsIncluded(min)) { + sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); + } StreamState s = {.node = fstGetRoot(sws->fst), .trans = 0, .out = {.null = false, .out = 0}, @@ -1203,7 +1214,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { uint64_t trans = s->trans; FstTransition trn; fstNodeGetTransitionAt(n, trans - 1, &trn); - StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; + StreamState s = { + .node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; taosArrayPush(sws->stack, &s); return true; } @@ -1260,9 +1272,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb size_t isz = taosArrayGetSize(sws->inp); uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t)); - for (uint32_t i = 0; i < isz; i++) { - buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); - } + for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); } FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); @@ -1327,8 +1337,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) { } void fstStreamBuilderDestroy(FstStreamBuilder* b) { fstSliceDestroy(&b->min->data); - tfree(b->min); fstSliceDestroy(&b->max->data); + tfree(b->min); tfree(b->max); free(b); } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index 590ff294bf8f1841fc27e4519df1d23f668adcd5..c6e3cee3e3b375fe283e7333e726a224b011c91c 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -17,9 +17,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { - return NULL; - } + if (nsv == NULL) { return NULL; } nsv->kind = kind; nsv->type = ty; @@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp } void startWithStateValueDestroy(void* val) { StartWithStateValue* sv = (StartWithStateValue*)val; - if (sv == NULL) { - return; - } + if (sv == NULL) { return; } if (sv->type == FST_INT) { // @@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) { } StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { - return NULL; - } + if (nsv == NULL) { return NULL; } nsv->kind = sv->kind; nsv->type = sv->type; @@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) { static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; } static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { StartWithStateValue* ssv = (StartWithStateValue*)state; - if (ssv == NULL || ctx == NULL) { - return NULL; - } + if (ssv == NULL || ctx == NULL) { return NULL; } char* data = ctx->data; - if (ssv->kind == Done) { - return startWithStateValueCreate(Done, FST_INT, &ssv->val); - } + if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); } if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { int val = ssv->val + 1; @@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = { AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); - if (ctx == NULL) { - return NULL; - } + if (ctx == NULL) { return NULL; } StartWithStateValue* sv = NULL; if (atype == AUTOMATION_ALWAYS) { diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 710db563d9e39ddea08832c03db6b0628afc2cc2..7906dfea111ae3bdab56a774abab0226305dd530 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) { int nRead = 0; if (ctx->type == TFile) { - tfLseek(ctx->file.fd, offset, 0); - nRead = tfRead(ctx->file.fd, buf, len); + // tfLseek(ctx->file.fd, offset, 0); + nRead = tfPread(ctx->file.fd, buf, len, offset); } else { // refactor later assert(0); @@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off } static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { + // taosFsyncFile(ctx->file.fd); tfFsync(ctx->file.fd); // tfFlush(ctx->file.fd); } else { @@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int // ugly code, refactor later ctx->file.readOnly = readOnly; if (readOnly == false) { + // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); } else { - ctx->file.fd = tfOpenReadWrite(path); + // ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); + ctx->file.fd = tfOpenRead(path); } memcpy(ctx->file.buf, path, strlen(path)); if (ctx->file.fd < 0) { - indexError("open file error %d", errno); + indexError("failed to open file, error %d", errno); goto END; } } else if (ctx->type == TMemory) { @@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { free(ctx->mem.buf); } else { tfClose(ctx->file.fd); - if (remove) { - indexError("rm file %s", ctx->file.buf); - unlink(ctx->file.buf); - } + if (remove) { unlink(ctx->file.buf); } } free(ctx); } @@ -144,7 +144,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } -int fstCountingWriterFlush(FstCountingWriter* write) { + +int fstCountingWriterFlush(FstCountingWriter* write) { WriterCtx* ctx = write->wrt; ctx->flush(ctx); // write->wtr->flush diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 17322e301e3b2227d610b7af45819d617b063a25..fc4f8593a105f5898fb977a8de0110bf83e5d6e5 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); -static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) { - TFileReader* reader = tfileReaderCreate(ctx); - tfileReaderRef(reader); - // tfileSerialCacheKey(&key, buf); - return reader; -} - TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); if (tcache == NULL) { return NULL; } @@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) { } char buf[128] = {0}; - TFileReader* reader = tfileReaderCreateImpl(wc); + TFileReader* reader = tfileReaderCreate(wc); TFileHeader* header = &reader->header; TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; tfileSerialCacheKey(&key, buf); + + tfileReaderRef(reader); + // indexTable taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); } taosArrayDestroyEx(files, tfileDestroyFileName); @@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) if (p != NULL) { TFileReader* oldReader = *p; taosHashRemove(tcache->tableCache, buf, strlen(buf)); + oldReader->remove = true; tfileReaderUnRef(oldReader); } @@ -152,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { // T_REF_INC(reader); reader->ctx = ctx; - if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, @@ -172,7 +168,7 @@ void tfileReaderDestroy(TFileReader* reader) { if (reader == NULL) { return; } // T_REF_INC(reader); fstDestroy(reader->fst); - writerCtxDestroy(reader->ctx, true); + writerCtxDestroy(reader->ctx, reader->remove); free(reader); } @@ -232,7 +228,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); if (wc == NULL) { return NULL; } - TFileReader* reader = tfileReaderCreateImpl(wc); + TFileReader* reader = tfileReaderCreate(wc); return reader; // tfileSerialCacheKey(&key, buf); @@ -330,13 +326,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { return -1; } // write fst + indexError("--------Begin----------------"); for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later TFileValue* v = taosArrayGetP((SArray*)data, i); if (tfileWriteData(tw, v) == 0) { // } + indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } + indexError("--------End----------------"); fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; @@ -360,7 +359,10 @@ IndexTFile* indexTFileCreate(const char* path) { tfile->cache = tfileCacheCreate(path); return tfile; } -void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); } +void IndexTFileDestroy(IndexTFile* tfile) { + tfileCacheDestroy(tfile->cache); + free(tfile); +} int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; @@ -539,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) { char buf[TFILE_HEADER_SIZE] = {0}; int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); - assert(nread == sizeof(buf)); + if (nread == -1) { + // + indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), + errno, reader->ctx->file.fd, reader->ctx->file.buf); + } + // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); + return 0; } static int tfileReaderLoadFst(TFileReader* reader) { @@ -573,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* char* buf = calloc(1, total); if (buf == NULL) { return -1; } - nread = ctx->read(ctx, buf, total); + nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid)); assert(total == nread); for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); } diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index a4d8bb36f11c86b155c599c94bb8ce79c9466c5e..da974ce6c4be500370eefe4b112f507b5e7912e6 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -1,6 +1,7 @@ #include #include +#include #include #include "index.h" #include "indexInt.h" @@ -42,7 +43,8 @@ class FstWriter { class FstReadMemory { public: - FstReadMemory(size_t size) { + FstReadMemory(size_t size, const std::string& fileName = fileName) { + tfInit(); _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _w = fstCountingWriterCreate(_wc); _size = size; @@ -101,6 +103,7 @@ class FstReadMemory { fstDestroy(_fst); fstSliceDestroy(&_s); writerCtxDestroy(_wc, false); + tfCleanup(); } private: @@ -165,8 +168,44 @@ void checkFstCheckIterator() { delete m; tfCleanup(); } -int main() { - checkFstCheckIterator(); + +void fst_get(Fst* fst) { + for (int i = 0; i < 10000; i++) { + std::string term = "Hello"; + FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size()); + uint64_t offset = 0; + bool ret = fstGet(fst, &key, &offset); + if (ret == false) { + std::cout << "not found" << std::endl; + } else { + std::cout << "found value:" << offset << std::endl; + } + } +} + +#define NUM_OF_THREAD 10 +void validateTFile(char* arg) { + tfInit(); + + std::thread threads[NUM_OF_THREAD]; + // std::vector threads; + TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1"); + + for (int i = 0; i < NUM_OF_THREAD; i++) { + threads[i] = std::thread(fst_get, reader->fst); + // threads.push_back(fst_get, reader->fst); + // std::thread t(fst_get, reader->fst); + } + for (int i = 0; i < NUM_OF_THREAD; i++) { + // wait join + threads[i].join(); + } + tfCleanup(); +} +int main(int argc, char* argv[]) { + if (argc > 1) { validateTFile(argv[1]); } + // checkFstCheckIterator(); // checkFstPrefixSearch(); + return 1; } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 588205c67ff95b4a1836b37c17681b33f2fbd414..080becccf183cd374ed833895c02b1f5a949803f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include "index.h" #include "indexInt.h" #include "index_cache.h" @@ -25,6 +26,9 @@ #include "tskiplist.h" #include "tutil.h" using namespace std; + +#define NUM_OF_THREAD 10 + class DebugInfo { public: DebugInfo(const char* str) : info(str) { @@ -41,6 +45,7 @@ class DebugInfo { private: std::string info; }; + class FstWriter { public: FstWriter() { @@ -332,6 +337,8 @@ class TFileObj { TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) { colId_ = 10; + reader_ = NULL; + writer_ = NULL; // Do Nothing // } @@ -527,6 +534,7 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + // indexTermDestry(term); } { std::string colVal("v3"); @@ -634,6 +642,23 @@ class IndexObj { indexMultiTermDestroy(terms); return numOfTable; } + int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world", + size_t numOfTable = 100 * 10000) { + std::string tColVal = colVal; + for (int i = 0; i < numOfTable; i++) { + tColVal[tColVal.size() - 1] = 'a' + i % 26; + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 10; i++) { + int ret = Put(terms, i); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + return numOfTable; + } int Put(SIndexMultiTerm* fvs, uint64_t uid) { numOfWrite += taosArrayGetSize(fvs); @@ -656,6 +681,14 @@ class IndexObj { return taosArrayGetSize(result); // assert(taosArrayGetSize(result) == targetSize); } + void PutOne(const std::string& colName, const std::string& colVal) { + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + Put(terms, 10); + indexMultiTermDestroy(terms); + } void Debug() { std::cout << "numOfWrite:" << numOfWrite << std::endl; std::cout << "numOfRead:" << numOfRead << std::endl; @@ -687,7 +720,7 @@ class IndexEnv2 : public ::testing::Test { IndexObj* index; }; TEST_F(IndexEnv2, testIndexOpen) { - std::string path = "/tmp"; + std::string path = "/tmp/test"; if (index->Init(path) != 0) { std::cout << "failed to init index" << std::endl; exit(1); @@ -723,10 +756,24 @@ TEST_F(IndexEnv2, testIndexOpen) { } indexMultiTermDestroy(terms); } - { + size_t size = 200; std::string colName("tag1"), colVal("Hello"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = size * 3; i < size * 4; i++) { + int tableId = i; + int ret = index->Put(terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + + { + std::string colName("tag1"), colVal("Hello"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); @@ -735,21 +782,44 @@ TEST_F(IndexEnv2, testIndexOpen) { SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); index->Search(mq, result); std::cout << "target size: " << taosArrayGetSize(result) << std::endl; - // assert(taosArrayGetSize(result) == targetSize); + assert(taosArrayGetSize(result) == 400); } } TEST_F(IndexEnv2, testIndex_TrigeFlush) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} + std::string path = "/tmp/test"; + if (index->Init(path) != 0) { + // r + std::cout << "failed to init" << std::endl; + } int numOfTable = 100 * 10000; - index->WriteMillonData("tag1", "Hello world", numOfTable); - int target = index->SearchOne("tag1", "Hello world"); + index->WriteMillonData("tag1", "Hello", numOfTable); + int target = index->SearchOne("tag1", "Hello"); assert(numOfTable == target); } + +static void write_and_search(IndexObj* idx) { + std::string colName("tag1"), colVal("Hello"); + + int target = idx->SearchOne("tag1", "Hello"); + idx->PutOne(colName, colVal); +} TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + // opt + } + index->WriteMultiMillonData("tag1", "Hello", 200000); + std::thread threads[NUM_OF_THREAD]; + + for (int i = 0; i < NUM_OF_THREAD; i++) { + // + threads[i] = std::thread(write_and_search, index); + } + for (int i = 0; i < NUM_OF_THREAD; i++) { + // TOD + threads[i].join(); + } } TEST_F(IndexEnv2, testIndex_multi_thread_write) { std::string path = "/tmp"; @@ -769,4 +839,7 @@ TEST_F(IndexEnv2, testIndex_performance) { std::string path = "/tmp"; if (index->Init(path) != 0) {} } -TEST_F(IndexEnv2, testIndexMultiTag) {} +TEST_F(IndexEnv2, testIndexMultiTag) { + std::string path = "/tmp"; + if (index->Init(path) != 0) {} +} diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 313f1d97afbf3f60a253105dfcac23a91af131e9..4cb20802c78176b8cd6791f463ed8abeda3df7d7 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -16,21 +16,19 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" -#include "ulog.h" -#include "tutil.h" #include "tref.h" +#include "tutil.h" +#include "ulog.h" static int32_t tsFileRsetId = -1; static int8_t tfInited = 0; -static void tfCloseFile(void *p) { - taosCloseFile((int32_t)(uintptr_t)p); -} +static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); - if(old == 1) return 0; + if (old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; @@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode return tfOpenImp(fd); } -int64_t tfClose(int64_t tfd) { - return taosRemoveRef(tsFileRsetId, tfd); -} +int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); @@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { return ret; } +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + + int64_t ret = pread(fd, buf, count, offset); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + int32_t tfFsync(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1;