diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 1504a69a08ebcf07a3617b7d8daf123ca06c3738..fcc0d5a0b3cf94c65fb83af53cc7bc9c3941ef93 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -38,6 +38,7 @@ typedef struct WriterCtx { int fd; bool readOnly; char buf[256]; + int size; } file; struct { int32_t capa; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 9c7320b301058a0c91e59d28dc676f9da29f385b..0657c68458465d8279f0e75b83951bd6b4bfd9e0 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -73,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { #ifdef USE_INVERTED_INDEX // sIdx->cache = (void*)indexCacheCreate(sIdx); sIdx->tindex = indexTFileCreate(path); + if (sIdx->tindex == NULL) { goto END; } sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; sIdx->path = calloc(1, strlen(path) + 1); @@ -83,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { return 0; #endif +END: + if (sIdx != NULL) { indexClose(sIdx); } *index = NULL; return -1; @@ -135,7 +138,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { SIndexTerm* p = taosArrayGetP(fVals, i); char buf[128] = {0}; - ICacheKey key = {.suid = p->suid, .colName = p->colName}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); @@ -150,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { SIndexTerm* p = taosArrayGetP(fVals, i); char buf[128] = {0}; - ICacheKey key = {.suid = p->suid, .colName = p->colName}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); @@ -212,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result indexInterResultsDestroy(interResults); #endif - return 1; + return 0; } int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { @@ -310,7 +313,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result pthread_mutex_lock(&sIdx->mtx); char buf[128] = {0}; - ICacheKey key = {.suid = term->suid, .colName = term->colName}; + ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index b4c533e998647c6540da8eef35604a4f05922377..e95de9286e409bc2c3147a1728d8055274e4f4f3 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define MEM_TERM_LIMIT 10 * 10000 +#define MEM_TERM_LIMIT 5 * 10000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 7906dfea111ae3bdab56a774abab0226305dd530..0f29da1c27cf480071ef795fa385c78552d9edce 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int if (readOnly == false) { // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); + + struct stat fstat; + stat(path, &fstat); + ctx->file.size = fstat.st_size; } else { // ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenRead(path); + + struct stat fstat; + stat(path, &fstat); + ctx->file.size = fstat.st_size; } memcpy(ctx->file.buf, path, strlen(path)); if (ctx->file.fd < 0) { @@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { free(ctx->mem.buf); } else { tfClose(ctx->file.fd); + ctx->flush(ctx); if (remove) { unlink(ctx->file.buf); } } free(ctx); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 95c713fb0a5a328c50761776db76725da2fa0229..d4d13ddf197e780e89ab24469a318cf2ae9feb41 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2019 TAOS Data, Inc. - * +p * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. @@ -45,12 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); -static int tfileGetFileList(const char* path, SArray* result); -static int tfileRmExpireFile(SArray* result); -static void tfileDestroyFileName(void* elem); -static int tfileCompare(const void* a, const void* b); -static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); -static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); +static SArray* tfileGetFileList(const char* path); +static int tfileRmExpireFile(SArray* result); +static void tfileDestroyFileName(void* elem); +static int tfileCompare(const void* a, const void* b); +static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version); +static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version); +static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version); TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); @@ -59,21 +60,24 @@ TFileCache* tfileCacheCreate(const char* path) { tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->capacity = 64; - SArray* files = taosArrayInit(4, sizeof(void*)); - tfileGetFileList(path, files); - taosArraySort(files, tfileCompare); - tfileRmExpireFile(files); + SArray* files = tfileGetFileList(path); uint64_t suid; int32_t colId, version; for (size_t i = 0; i < taosArrayGetSize(files); i++) { char* file = taosArrayGetP(files, i); - if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) { + + // refactor later, use colname and version info + char colName[256] = {0}; + if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) { indexInfo("try parse invalid file: %s, skip it", file); continue; } - WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64); + char fullName[256] = {0}; + sprintf(fullName, "%s/%s", path, file); + + WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64); if (wc == NULL) { indexError("failed to open index:%s", file); goto End; @@ -200,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul } TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { - char filename[128] = {0}; - int32_t coldId = 1; - tfileGenFileName(filename, suid, coldId, version); - char fullname[256] = {0}; - snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); + tfileGenFileFullName(fullname, path, suid, colName, version); + // indexInfo("open write file name %s", fullname); WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64); if (wcx == NULL) { return NULL; } @@ -218,13 +219,11 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c return tfileWriterCreate(wcx, &tfh); } TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) { - char filename[128] = {0}; - int32_t coldId = 1; - tfileGenFileName(filename, suid, coldId, version); - char fullname[256] = {0}; - snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); + tfileGenFileFullName(fullname, path, suid, colName, version); + WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); + // indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); if (wc == NULL) { return NULL; } TFileReader* reader = tfileReaderCreate(wc); @@ -324,7 +323,6 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { } // write data - indexError("--------Begin----------------"); for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later TFileValue* v = taosArrayGetP((SArray*)data, i); @@ -332,11 +330,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } else { - indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, - (int)taosArrayGetSize(v->tableId)); + // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, + // (int)taosArrayGetSize(v->tableId)); } } - indexError("--------End----------------"); fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; @@ -361,6 +358,7 @@ IndexTFile* indexTFileCreate(const char* path) { return tfile; } void indexTFileDestroy(IndexTFile* tfile) { + if (tfile == NULL) { return; } tfileCacheDestroy(tfile->cache); free(tfile); } @@ -550,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) { // indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno, reader->ctx->file.fd, reader->ctx->file.buf); + } else { + indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), + errno, reader->ctx->file.fd, reader->ctx->file.buf); } // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); @@ -558,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) { } static int tfileReaderLoadFst(TFileReader* reader) { // current load fst into memory, refactor it later - static int FST_MAX_SIZE = 64 * 1024; + static int FST_MAX_SIZE = 64 * 1024 * 1024; char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); if (buf == NULL) { return -1; } WriterCtx* ctx = reader->ctx; int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); + indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread < FST_MAX_SIZE); @@ -608,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) { } } -static int tfileGetFileList(const char* path, SArray* result) { +static SArray* tfileGetFileList(const char* path) { + SArray* files = taosArrayInit(4, sizeof(void*)); + DIR* dir = opendir(path); - if (NULL == dir) { return -1; } + if (NULL == dir) { return NULL; } struct dirent* entry; while ((entry = readdir(dir)) != NULL) { + if (entry->d_type && DT_DIR) { continue; } size_t len = strlen(entry->d_name); char* buf = calloc(1, len + 1); memcpy(buf, entry->d_name, len); - taosArrayPush(result, &buf); + taosArrayPush(files, &buf); } closedir(dir); - return 0; + + taosArraySort(files, tfileCompare); + tfileRmExpireFile(files); + + return files; } static int tfileRmExpireFile(SArray* result) { // TODO(yihao): remove expire tindex after restart @@ -641,15 +650,21 @@ static int tfileCompare(const void* a, const void* b) { if (ret == 0) { return ret; } return ret < 0 ? -1 : 1; } -// tfile name suid-colId-version.tindex -static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) { - sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version); - return; -} -static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) { - if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) { + +static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) { + if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) { // read suid & colid & version success return 0; } return -1; } +// tfile name suid-colId-version.tindex +static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) { + sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version); + return; +} +static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) { + char filename[128] = {0}; + tfileGenFileName(filename, suid, col, version); + sprintf(fullname, "%s/%s", path, filename); +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index bdfb86ce1714ab3b309091718c1b72676b8120fd..77b6e02f186356e8e16a66afb79716e8a0a5479d 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include +#include #include #include #include @@ -638,7 +639,7 @@ class IndexObj { indexInit(); } int Init(const std::string& dir) { - taosRemoveDir(dir.c_str()); + // taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); int ret = indexOpen(&opts, dir.c_str(), &idx); if (ret != 0) { @@ -663,10 +664,11 @@ class IndexObj { int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world", size_t numOfTable = 100 * 10000) { std::string tColVal = colVal; + size_t colValSize = tColVal.size(); for (int i = 0; i < numOfTable; i++) { - tColVal[tColVal.size() - 1] = 'a' + i % 26; + tColVal[i % colValSize] = 'a' + i % 26; SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + tColVal.c_str(), tColVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); for (size_t i = 0; i < 10; i++) { @@ -695,7 +697,13 @@ class IndexObj { indexMultiTermQueryAdd(mq, term, QUERY_TERM); SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); - if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; } + + int64_t s = taosGetTimestampUs(); + if (Search(mq, result) == 0) { + int64_t e = taosGetTimestampUs(); + std::cout << "search one successfully and time cost:" << e - s << std::endl; + } else { + } int sz = taosArrayGetSize(result); indexMultiTermQueryDestroy(mq); taosArrayDestroy(result); @@ -810,7 +818,7 @@ TEST_F(IndexEnv2, testIndexOpen) { } TEST_F(IndexEnv2, testIndex_TrigeFlush) { - std::string path = "/tmp/test1"; + std::string path = "/tmp/testxxx"; if (index->Init(path) != 0) { // r std::cout << "failed to init" << std::endl; @@ -826,6 +834,10 @@ static void write_and_search(IndexObj* idx) { std::string colName("tag1"), colVal("Hello"); int target = idx->SearchOne("tag1", "Hello"); + std::cout << "search: " << target << std::endl; + target = idx->SearchOne("tag2", "Test"); + std::cout << "search: " << target << std::endl; + idx->PutOne(colName, colVal); } TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { @@ -833,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { if (index->Init(path) != 0) { // opt } - index->WriteMultiMillonData("tag1", "Hello", 200000); + index->PutOne("tag1", "Hello"); + index->PutOne("tag2", "Test"); + index->WriteMultiMillonData("tag1", "Hello", 50 * 10000); + index->WriteMultiMillonData("tag2", "Test", 50 * 10000); std::thread threads[NUM_OF_THREAD]; for (int i = 0; i < NUM_OF_THREAD; i++) { @@ -847,15 +862,15 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { } TEST_F(IndexEnv2, testIndex_restart) { - std::string path = "/tmp"; + std::string path = "/tmp/test1"; if (index->Init(path) != 0) {} } TEST_F(IndexEnv2, testIndex_performance) { - std::string path = "/tmp"; + std::string path = "/tmp/test2"; if (index->Init(path) != 0) {} } TEST_F(IndexEnv2, testIndexMultiTag) { - std::string path = "/tmp"; + std::string path = "/tmp/test3"; if (index->Init(path) != 0) {} }