diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 0657c68458465d8279f0e75b83951bd6b4bfd9e0..691e564c68f48f413e32644718802d1e5b57efd1 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { // sIdx->cache = (void*)indexCacheCreate(sIdx); sIdx->tindex = indexTFileCreate(path); if (sIdx->tindex == NULL) { goto END; } + sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; - sIdx->path = calloc(1, strlen(path) + 1); - memcpy(sIdx->path, path, strlen(path)); + sIdx->path = tstrdup(path); pthread_mutex_init(&sIdx->mtx, NULL); - *index = sIdx; - return 0; #endif + END: if (sIdx != NULL) { indexClose(sIdx); } @@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // Get col info IndexCache* cache = NULL; - pthread_mutex_lock(&sIdx->mtx); char buf[128] = {0}; ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); + pthread_mutex_lock(&sIdx->mtx); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); - if (pCache == NULL) { - pthread_mutex_unlock(&sIdx->mtx); - return -1; - } - cache = *pCache; + cache = (pCache == NULL) ? NULL : *pCache; pthread_mutex_unlock(&sIdx->mtx); *result = taosArrayInit(4, sizeof(uint64_t)); @@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result STermValueType s = kTypeValue; if (0 == indexCacheSearch(cache, query, *result, &s)) { if (s == kTypeDeletion) { - indexInfo("col: %s already drop by other opera", term->colName); + indexInfo("col: %s already drop by", term->colName); // coloum already drop by other oper, no need to query tindex return 0; } else { @@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { tfileWriterClose(tw); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); + if (reader == NULL) { goto END; } - char buf[128] = {0}; TFileHeader* header = &reader->header; ICacheKey key = { .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; pthread_mutex_lock(&sIdx->mtx); - IndexTFile* ifile = (IndexTFile*)sIdx->tindex; tfileCachePut(ifile->cache, &key, reader); - pthread_mutex_unlock(&sIdx->mtx); return ret; END: diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index e95de9286e409bc2c3147a1728d8055274e4f4f3..a4993257b3476a7a305a7fb12ac69c4fe66bbab4 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { - if (cache == NULL) { return -1; } + if (cache == NULL) { return 0; } IndexCache* pCache = cache; MemTable *mem = NULL, *imm = NULL; diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 0f29da1c27cf480071ef795fa385c78552d9edce..c3e1aab381fa9a2c98f83ba6d78cf15c85cbc304 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -18,7 +18,7 @@ #include "tutil.h" static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { - if (ctx->offset + len > ctx->limit) { return -1; } + // if (ctx->offset + len > ctx->limit) { return -1; } if (ctx->type == TFile) { assert(len == tfWrite(ctx->file.fd, buf, len)); @@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { if (ctx->type == TMemory) { free(ctx->mem.buf); } else { + // ctx->flush(ctx); tfClose(ctx->file.fd); - ctx->flush(ctx); if (remove) { unlink(ctx->file.buf); } } free(ctx); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index d4d13ddf197e780e89ab24469a318cf2ae9feb41..b32226775dcb9a3352d8fe4b67eb5dbb2a34748a 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) { for (size_t i = 0; i < taosArrayGetSize(files); i++) { char* file = taosArrayGetP(files, i); - // refactor later, use colname and version info - char colName[256] = {0}; - if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) { - indexInfo("try parse invalid file: %s, skip it", file); - continue; - } - - char fullName[256] = {0}; - sprintf(fullName, "%s/%s", path, file); - - WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64); + WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64); if (wc == NULL) { indexError("failed to open index:%s", file); goto End; } - char buf[128] = {0}; TFileReader* reader = tfileReaderCreate(wc); + if (reader == NULL) { goto End; } TFileHeader* header = &reader->header; - ICacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; + + char buf[128] = {0}; + ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); assert(sz < sizeof(buf)); @@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { // sort by coltype and write to tindex if (order == false) { __compar_fn_t fn; - int8_t colType = tw->header.colType; + + int8_t colType = tw->header.colType; if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { fn = tfileStrCompare; } else { @@ -351,10 +341,16 @@ void tfileWriterDestroy(TFileWriter* tw) { } IndexTFile* indexTFileCreate(const char* path) { + TFileCache* cache = tfileCacheCreate(path); + if (cache == NULL) { return NULL; } + IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); - if (tfile == NULL) { return NULL; } + if (tfile == NULL) { + tfileCacheDestroy(cache); + return NULL; + } - tfile->cache = tfileCacheCreate(path); + tfile->cache = cache; return tfile; } void indexTFileDestroy(IndexTFile* tfile) { @@ -366,6 +362,7 @@ void indexTFileDestroy(IndexTFile* tfile) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; if (tfile == NULL) { return ret; } + IndexTFile* pTfile = (IndexTFile*)tfile; SIndexTerm* term = query->term; @@ -545,7 +542,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) { int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); if (nread == -1) { - // indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno, reader->ctx->file.fd, reader->ctx->file.buf); } else { @@ -566,7 +562,8 @@ static int tfileReaderLoadFst(TFileReader* reader) { WriterCtx* ctx = reader->ctx; int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); - indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf); + indexError("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf, + ctx->file.size); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread < FST_MAX_SIZE); @@ -613,15 +610,20 @@ void tfileReaderUnRef(TFileReader* reader) { static SArray* tfileGetFileList(const char* path) { SArray* files = taosArrayInit(4, sizeof(void*)); + char buf[128] = {0}; + uint64_t suid; + uint32_t version; + DIR* dir = opendir(path); if (NULL == dir) { return NULL; } - struct dirent* entry; while ((entry = readdir(dir)) != NULL) { - if (entry->d_type && DT_DIR) { continue; } - size_t len = strlen(entry->d_name); - char* buf = calloc(1, len + 1); - memcpy(buf, entry->d_name, len); + char* file = entry->d_name; + if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; } + + size_t len = strlen(path) + 1 + strlen(file) + 1; + char* buf = calloc(1, len); + sprintf(buf, "%s/%s", path, file); taosArrayPush(files, &buf); } closedir(dir); diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 77b6e02f186356e8e16a66afb79716e8a0a5479d..b33b5b76321c049278f8dec523dc57c158234a98 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -848,7 +848,7 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { index->PutOne("tag1", "Hello"); index->PutOne("tag2", "Test"); index->WriteMultiMillonData("tag1", "Hello", 50 * 10000); - index->WriteMultiMillonData("tag2", "Test", 50 * 10000); + index->WriteMultiMillonData("tag2", "Test", 10 * 10000); std::thread threads[NUM_OF_THREAD]; for (int i = 0; i < NUM_OF_THREAD; i++) {