提交 4079bd2b 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 c1a2366d
...@@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; } if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1; sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1); sIdx->path = tstrdup(path);
memcpy(sIdx->path, path, strlen(path));
pthread_mutex_init(&sIdx->mtx, NULL); pthread_mutex_init(&sIdx->mtx, NULL);
*index = sIdx; *index = sIdx;
return 0; return 0;
#endif #endif
END: END:
if (sIdx != NULL) { indexClose(sIdx); } if (sIdx != NULL) { indexClose(sIdx); }
...@@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info // Get col info
IndexCache* cache = NULL; IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
if (pCache == NULL) { cache = (pCache == NULL) ? NULL : *pCache;
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
cache = *pCache;
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t)); *result = taosArrayInit(4, sizeof(uint64_t));
...@@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
STermValueType s = kTypeValue; STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) { if (0 == indexCacheSearch(cache, query, *result, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName); indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex // coloum already drop by other oper, no need to query tindex
return 0; return 0;
} else { } else {
...@@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose(tw); tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
if (reader == NULL) { goto END; }
char buf[128] = {0};
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = { ICacheKey key = {
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex; IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
tfileCachePut(ifile->cache, &key, reader); tfileCachePut(ifile->cache, &key, reader);
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
return ret; return ret;
END: END:
......
...@@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA ...@@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return 0; return 0;
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { return -1; } if (cache == NULL) { return 0; }
IndexCache* pCache = cache; IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL; MemTable *mem = NULL, *imm = NULL;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tutil.h" #include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
if (ctx->offset + len > ctx->limit) { return -1; } // if (ctx->offset + len > ctx->limit) { return -1; }
if (ctx->type == TFile) { if (ctx->type == TFile) {
assert(len == tfWrite(ctx->file.fd, buf, len)); assert(len == tfWrite(ctx->file.fd, buf, len));
...@@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { ...@@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->type == TMemory) { if (ctx->type == TMemory) {
free(ctx->mem.buf); free(ctx->mem.buf);
} else { } else {
// ctx->flush(ctx);
tfClose(ctx->file.fd); tfClose(ctx->file.fd);
ctx->flush(ctx);
if (remove) { unlink(ctx->file.buf); } if (remove) { unlink(ctx->file.buf); }
} }
free(ctx); free(ctx);
......
...@@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
// refactor later, use colname and version info WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index:%s", file); indexError("failed to open index:%s", file);
goto End; goto End;
} }
char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (reader == NULL) { goto End; }
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid,
.colName = header->colName, char buf[128] = {0};
.nColName = strlen(header->colName), ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
.colType = header->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
...@@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// sort by coltype and write to tindex // sort by coltype and write to tindex
if (order == false) { if (order == false) {
__compar_fn_t fn; __compar_fn_t fn;
int8_t colType = tw->header.colType;
int8_t colType = tw->header.colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare; fn = tfileStrCompare;
} else { } else {
...@@ -351,10 +341,16 @@ void tfileWriterDestroy(TFileWriter* tw) { ...@@ -351,10 +341,16 @@ void tfileWriterDestroy(TFileWriter* tw) {
} }
IndexTFile* indexTFileCreate(const char* path) { IndexTFile* indexTFileCreate(const char* path) {
TFileCache* cache = tfileCacheCreate(path);
if (cache == NULL) { return NULL; }
IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { return NULL; } if (tfile == NULL) {
tfileCacheDestroy(cache);
return NULL;
}
tfile->cache = tfileCacheCreate(path); tfile->cache = cache;
return tfile; return tfile;
} }
void indexTFileDestroy(IndexTFile* tfile) { void indexTFileDestroy(IndexTFile* tfile) {
...@@ -366,6 +362,7 @@ void indexTFileDestroy(IndexTFile* tfile) { ...@@ -366,6 +362,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { return ret; } if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
...@@ -545,7 +542,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -545,7 +542,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
if (nread == -1) { if (nread == -1) {
//
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf); errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else { } else {
...@@ -566,7 +562,8 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -566,7 +562,8 @@ static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf); indexError("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
ctx->file.size);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE); assert(nread > 0 && nread < FST_MAX_SIZE);
...@@ -613,15 +610,20 @@ void tfileReaderUnRef(TFileReader* reader) { ...@@ -613,15 +610,20 @@ void tfileReaderUnRef(TFileReader* reader) {
static SArray* tfileGetFileList(const char* path) { static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = taosArrayInit(4, sizeof(void*));
char buf[128] = {0};
uint64_t suid;
uint32_t version;
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { return NULL; } if (NULL == dir) { return NULL; }
struct dirent* entry; struct dirent* entry;
while ((entry = readdir(dir)) != NULL) { while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; } char* file = entry->d_name;
size_t len = strlen(entry->d_name); if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len); size_t len = strlen(path) + 1 + strlen(file) + 1;
char* buf = calloc(1, len);
sprintf(buf, "%s/%s", path, file);
taosArrayPush(files, &buf); taosArrayPush(files, &buf);
} }
closedir(dir); closedir(dir);
......
...@@ -848,7 +848,7 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { ...@@ -848,7 +848,7 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
index->PutOne("tag1", "Hello"); index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test"); index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000); index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000); index->WriteMultiMillonData("tag2", "Test", 10 * 10000);
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册