未验证 提交 6a4424b5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9296 from taosdata/feature/index_cache

update tindex reader
...@@ -47,7 +47,6 @@ typedef struct TFileHeader { ...@@ -47,7 +47,6 @@ typedef struct TFileHeader {
typedef struct TFileCacheKey { typedef struct TFileCacheKey {
uint64_t suid; uint64_t suid;
uint8_t colType; uint8_t colType;
int32_t version;
char* colName; char* colName;
int32_t nColName; int32_t nColName;
} TFileCacheKey; } TFileCacheKey;
...@@ -67,6 +66,7 @@ typedef struct TFileWriter { ...@@ -67,6 +66,7 @@ typedef struct TFileWriter {
uint32_t offset; uint32_t offset;
} TFileWriter; } TFileWriter;
// multi reader and single write
typedef struct TFileReader { typedef struct TFileReader {
T_REF_DECLARE() T_REF_DECLARE()
Fst* fst; Fst* fst;
......
...@@ -38,11 +38,14 @@ static int tfileWriteHeader(TFileWriter* writer); ...@@ -38,11 +38,14 @@ static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReadLoadHeader(TFileReader* reader); static int tfileReadLoadHeader(TFileReader* reader);
static int tfileReadLoadFst(TFileReader* reader); static int tfileReadLoadFst(TFileReader* reader);
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static void tfileReadRef(TFileReader* reader);
static void tfileReadUnRef(TFileReader* reader);
static int tfileGetFileList(const char* path, SArray* result); static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem); static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
...@@ -58,6 +61,8 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -58,6 +61,8 @@ TFileCache* tfileCacheCreate(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = taosArrayInit(4, sizeof(void*));
tfileGetFileList(path, files); tfileGetFileList(path, files);
taosArraySort(files, tfileCompare); taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
uint64_t suid; uint64_t suid;
int32_t colId, version; int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
...@@ -66,29 +71,29 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -66,29 +71,29 @@ TFileCache* tfileCacheCreate(const char* path) {
indexInfo("try parse invalid file: %s, skip it", file); indexInfo("try parse invalid file: %s, skip it", file);
continue; continue;
} }
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64); WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index: %s", file); indexError("failed to open index: %s", file);
goto End; goto End;
} }
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (0 != tfileReadLoadHeader(reader)) { if (0 != tfileReadLoadHeader(reader)) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
indexError("failed to load index header, index file: %s", file); indexError("failed to load index header, index file: %s", file);
goto End; goto End;
} }
if (0 != tfileReadLoadFst(reader)) { if (0 != tfileReadLoadFst(reader)) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
indexError("failed to load index fst, index file: %s", file); indexError("failed to load index fst, index file: %s", file);
goto End;
} }
tfileReadRef(reader);
// loader fst and validate it // loader fst and validate it
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
.version = header->version,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(&key, buf); tfileSerialCacheKey(&key, buf);
...@@ -110,7 +115,7 @@ void tfileCacheDestroy(TFileCache* tcache) { ...@@ -110,7 +115,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* p = *reader; TFileReader* p = *reader;
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
tfileReaderDestroy(p); tfileReadUnRef(p);
reader = taosHashIterate(tcache->tableCache, reader); reader = taosHashIterate(tcache->tableCache, reader);
} }
taosHashCleanup(tcache->tableCache); taosHashCleanup(tcache->tableCache);
...@@ -120,12 +125,24 @@ void tfileCacheDestroy(TFileCache* tcache) { ...@@ -120,12 +125,24 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(key, buf); tfileSerialCacheKey(key, buf);
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
tfileReadRef(reader);
return reader; return reader;
} }
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(key, buf); tfileSerialCacheKey(key, buf);
// remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
if (*p != NULL) {
TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf));
tfileReadUnRef(oldReader);
}
tfileReadRef(reader);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return; return;
} }
...@@ -147,25 +164,29 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -147,25 +164,29 @@ void tfileReaderDestroy(TFileReader* reader) {
} }
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
int ret = -1;
// refactor to callback later // refactor to callback later
if (query->qType == QUERY_TERM) { if (qtype == QUERY_TERM) {
uint64_t offset; uint64_t offset;
FstSlice key = fstSliceCreate(term->colVal, term->nColVal); FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
if (fstGet(reader->fst, &key, &offset)) { if (fstGet(reader->fst, &key, &offset)) {
return tfileReadLoadTableIds(reader, offset, result); indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
ret = tfileReadLoadTableIds(reader, offset, result);
} else { } else {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal); indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
} }
return 0; fstSliceDestroy(&key);
} else if (query->qType == QUERY_PREFIX) { } else if (qtype == QUERY_PREFIX) {
// handle later // handle later
// //
} else { } else {
// handle later // handle later
} }
return 0; tfileReadUnRef(reader);
return ret;
} }
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
...@@ -209,7 +230,6 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -209,7 +230,6 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
int32_t tbsz = taosArrayGetSize(v->tableId); int32_t tbsz = taosArrayGetSize(v->tableId);
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
} }
// check result or not
tfileWriteFstOffset(tw, fstOffset); tfileWriteFstOffset(tw, fstOffset);
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
...@@ -237,6 +257,7 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -237,6 +257,7 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
// write reversed data in buf to tindex // write reversed data in buf to tindex
tw->ctx->write(tw->ctx, buf, offset); tw->ctx->write(tw->ctx, buf, offset);
} }
tfree(buf);
// write fst // write fst
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
...@@ -244,11 +265,8 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -244,11 +265,8 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
if (tfileWriteData(tw, v) == 0) { if (tfileWriteData(tw, v) == 0) {
// //
//
} }
} }
tfree(buf);
return 0; return 0;
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {
...@@ -270,17 +288,19 @@ void IndexTFileDestroy(IndexTFile* tfile) { ...@@ -270,17 +288,19 @@ void IndexTFileDestroy(IndexTFile* tfile) {
} }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if (tfile == NULL) { return -1; } int ret = -1;
if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName}; TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
return tfileReaderSearch(reader, query, result); return tfileReaderSearch(reader, query, result);
} }
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1}; // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
// 1};
return 0; return 0;
} }
...@@ -309,8 +329,7 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { ...@@ -309,8 +329,7 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
return 0; return 0;
} }
static int tfileWriteHeader(TFileWriter* writer) { static int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_NO_FST] = {0}; char buf[TFILE_HEADER_NO_FST] = {0};
char* p = buf;
TFileHeader* header = &writer->header; TFileHeader* header = &writer->header;
memcpy(buf, (char*)header, sizeof(buf)); memcpy(buf, (char*)header, sizeof(buf));
...@@ -338,8 +357,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { ...@@ -338,8 +357,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
} }
static int tfileReadLoadHeader(TFileReader* reader) { static int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later // TODO simple tfile header later
char buf[TFILE_HEADER_SIZE] = {0}; char buf[TFILE_HEADER_SIZE] = {0};
char* p = buf;
int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf)); int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf));
assert(nread == sizeof(buf)); assert(nread == sizeof(buf));
...@@ -368,20 +386,32 @@ static int tfileReadLoadFst(TFileReader* reader) { ...@@ -368,20 +386,32 @@ static int tfileReadLoadFst(TFileReader* reader) {
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
int32_t nid; int32_t nid;
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
assert(sizeof(nid) == nread); assert(sizeof(nid) == nread);
char* buf = calloc(1, sizeof(uint64_t) * nid); int32_t total = sizeof(uint64_t) * nid;
char* buf = calloc(1, total);
if (buf == NULL) { return -1; } if (buf == NULL) { return -1; }
nread = ctx->read(ctx, buf, sizeof(uint64_t) * nid); nread = ctx->read(ctx, buf, total);
uint64_t* ids = (uint64_t*)buf; assert(total == nread);
for (int32_t i = 0; i < nid; i++) { for (int32_t i = 0; i < nid; i++) {
taosArrayPush(result, ids + i); taosArrayPush(result, (uint64_t*)buf + i);
} }
free(buf); free(buf);
return 0; return 0;
} }
static void tfileReadRef(TFileReader* reader) {
int ref = T_REF_INC(reader);
UNUSED(ref);
}
static void tfileReadUnRef(TFileReader* reader) {
int ref = T_REF_DEC(reader);
if (ref == 0) { tfileReaderDestroy(reader); }
}
static int tfileGetFileList(const char* path, SArray* result) { static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path); DIR* dir = opendir(path);
...@@ -397,6 +427,10 @@ static int tfileGetFileList(const char* path, SArray* result) { ...@@ -397,6 +427,10 @@ static int tfileGetFileList(const char* path, SArray* result) {
closedir(dir); closedir(dir);
return 0; return 0;
} }
static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart
return 0;
}
static void tfileDestroyFileName(void* elem) { static void tfileDestroyFileName(void* elem) {
char* p = *(char**)elem; char* p = *(char**)elem;
free(p); free(p);
...@@ -423,7 +457,5 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { ...@@ -423,7 +457,5 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType); SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, version);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册