未验证 提交 3fe3222f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9219 from taosdata/feature/index_cache

Feature/index cache
...@@ -76,7 +76,7 @@ struct SIndexMultiTermQuery { ...@@ -76,7 +76,7 @@ struct SIndexMultiTermQuery {
// field and key; // field and key;
typedef struct SIndexTerm { typedef struct SIndexTerm {
int64_t suid; int64_t suid;
SIndexOperOnColumn operType; // oper type, add/del/update SIndexOperOnColumn operType; // oper type, add/del/update
uint8_t colType; // term data type, str/interger/json uint8_t colType; // term data type, str/interger/json
char *colName; char *colName;
......
...@@ -34,8 +34,14 @@ typedef struct WriterCtx { ...@@ -34,8 +34,14 @@ typedef struct WriterCtx {
int (*flush)(struct WriterCtx *ctx); int (*flush)(struct WriterCtx *ctx);
WriterType type; WriterType type;
union { union {
int fd; struct {
void *mem; int fd;
bool readOnly;
} file;
struct {
int32_t capa;
char *buf;
} mem;
}; };
int32_t offset; int32_t offset;
int32_t limit; int32_t limit;
...@@ -45,7 +51,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); ...@@ -45,7 +51,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len);
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len);
static int writeCtxDoFlush(WriterCtx *ctx); static int writeCtxDoFlush(WriterCtx *ctx);
WriterCtx* writerCtxCreate(WriterType type, bool readOnly); WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx *w); void writerCtxDestroy(WriterCtx *w);
typedef uint32_t CheckSummer; typedef uint32_t CheckSummer;
...@@ -66,7 +72,7 @@ int fstCountingWriterFlush(FstCountingWriter *write); ...@@ -66,7 +72,7 @@ int fstCountingWriterFlush(FstCountingWriter *write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write); uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write);
FstCountingWriter *fstCountingWriterCreate(void *wtr, bool readOnly); FstCountingWriter *fstCountingWriterCreate(void *wtr);
void fstCountingWriterDestroy(FstCountingWriter *w); void fstCountingWriterDestroy(FstCountingWriter *w);
......
...@@ -18,23 +18,81 @@ ...@@ -18,23 +18,81 @@
#include "index.h" #include "index.h"
#include "indexInt.h" #include "indexInt.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tskiplist.h" #include "index_tfile.h"
#include "index_fst_counting_writer.h"
#include "index_fst.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct IndexTFile {
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
int32_t version;
const char *colName;
int32_t nColName;
} TFileCacheKey;
// table cache
// refactor to LRU cache later
typedef struct TFileCache {
SHashObj *tableCache;
int16_t capacity;
// add more param
} TFileCache;
typedef struct TFileWriter {
FstBuilder *fb;
WriterCtx *wc;
} TFileWriter;
typedef struct TFileReader {
T_REF_DECLARE() T_REF_DECLARE()
Fst *fst;
} TFileReader;
typedef struct IndexTFile {
char *path;
TFileReader *tb;
TFileWriter *tw;
} IndexTFile; } IndexTFile;
typedef struct TFileWriterOpt {
uint64_t suid;
int8_t colType;
char *colName;
int32_t nColName;
int32_t version;
} TFileWriterOpt;
typedef struct TFileReaderOpt {
uint64_t suid;
char *colName;
int32_t nColName;
} TFileReaderOpt;
// tfile cache
TFileCache *tfileCacheCreate();
void tfileCacheDestroy(TFileCache *tcache);
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
IndexTFile *indexTFileCreate(); IndexTFile *indexTFileCreate();
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result); int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_UTIL_H__
#define __INDEX_UTIL_H__
#ifdef __cplusplus
extern "C" {
#endif
#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \
do { \
memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \
buf += sizeof(key->mem); \
} while (0)
#define SERIALIZE_STR_MEM_TO_BUF(buf, key, mem, len) \
do { \
memcpy((void *)buf, (void *)key->mem, len); \
buf += len; \
} while (0)
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(var) == sizeof(type));\
memcpy((void *)buf, (void *)&c, sizeof(c)); \
buf += sizeof(c); \
} while (0)
#define SERIALIZE_STR_VAR_TO_BUF(buf, var, len) \
do { \
memcpy((void *)buf, (void *)var, len); \
buf += len;\
} while (0)
#ifdef __cplusplus
}
#endif
#endif
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "index_cache.h" #include "index_cache.h"
#include "tcompare.h" #include "tcompare.h"
#include "index_util.h"
#define MAX_INDEX_KEY_LEN 256// test only, change later #define MAX_INDEX_KEY_LEN 256// test only, change later
...@@ -110,35 +111,22 @@ int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, ...@@ -110,35 +111,22 @@ int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version,
if (cache == NULL) { return -1;} if (cache == NULL) { return -1;}
IndexCache *pCache = cache; IndexCache *pCache = cache;
// encode data // encode data
int32_t total = CACHE_KEY_LEN(term); int32_t total = CACHE_KEY_LEN(term);
char *buf = calloc(1, total); char *buf = calloc(1, total);
char *p = buf; char *p = buf;
memcpy(p, &total, sizeof(total)); SERIALIZE_VAR_TO_BUF(p, total,int32_t);
p += sizeof(total); SERIALIZE_VAR_TO_BUF(p, colId, int16_t);
memcpy(p, &colId, sizeof(colId));
p += sizeof(colId);
memcpy(p, &term->colType, sizeof(term->colType)); SERIALIZE_MEM_TO_BUF(p, term, colType);
p += sizeof(term->colType); SERIALIZE_MEM_TO_BUF(p, term, nColVal);
SERIALIZE_STR_MEM_TO_BUF(p, term, colVal, term->nColVal);
memcpy(p, &term->nColVal, sizeof(term->nColVal)); SERIALIZE_VAR_TO_BUF(p, version, int32_t);
p += sizeof(term->nColVal); SERIALIZE_VAR_TO_BUF(p, uid, uint64_t);
memcpy(p, term->colVal, term->nColVal);
p += term->nColVal;
memcpy(p, &version, sizeof(version));
p += sizeof(version);
memcpy(p, &uid, sizeof(uid));
p += sizeof(uid);
memcpy(p, &term->operType, sizeof(term->operType)); SERIALIZE_MEM_TO_BUF(p, term, operType);
p += sizeof(term->operType);
tSkipListPut(pCache->skiplist, (void *)buf); tSkipListPut(pCache->skiplist, (void *)buf);
return 0; return 0;
......
...@@ -779,7 +779,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { ...@@ -779,7 +779,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) {
if (NULL == b) { return b; } if (NULL == b) { return b; }
b->wrt = fstCountingWriterCreate(w, false); b->wrt = fstCountingWriterCreate(w);
b->unfinished = fstUnFinishedNodesCreate(); b->unfinished = fstUnFinishedNodesCreate();
b->registry = fstRegistryCreate(10000, 2) ; b->registry = fstRegistryCreate(10000, 2) ;
b->last = fstSliceCreate(NULL, 0); b->last = fstSliceCreate(NULL, 0);
......
...@@ -23,9 +23,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { ...@@ -23,9 +23,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
} }
if (ctx->type == TFile) { if (ctx->type == TFile) {
assert(len == tfWrite(ctx->fd, buf, len)); assert(len == tfWrite(ctx->file.fd, buf, len));
} else { } else {
memcpy(ctx->mem + ctx->offset, buf, len); memcpy(ctx->mem.buf+ ctx->offset, buf, len);
} }
ctx->offset += len; ctx->offset += len;
return len; return len;
...@@ -33,9 +33,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { ...@@ -33,9 +33,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
int nRead = 0; int nRead = 0;
if (ctx->type == TFile) { if (ctx->type == TFile) {
nRead = tfRead(ctx->fd, buf, len); nRead = tfRead(ctx->file.fd, buf, len);
} else { } else {
memcpy(buf, ctx->mem + ctx->offset, len); memcpy(buf, ctx->mem.buf + ctx->offset, len);
} }
ctx->offset += nRead; ctx->offset += nRead;
...@@ -44,63 +44,64 @@ static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { ...@@ -44,63 +44,64 @@ static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
static int writeCtxDoFlush(WriterCtx *ctx) { static int writeCtxDoFlush(WriterCtx *ctx) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
//tfFsync(ctx->fd); //tfFsync(ctx->fd);
//tfFlush(ctx->fd); //tfFlush(ctx->file.fd);
} else { } else {
// do nothing // do nothing
} }
return 1; return 1;
} }
WriterCtx* writerCtxCreate(WriterType type, bool readOnly) { WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) {
WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
if (ctx == NULL) { return NULL; } if (ctx == NULL) { return NULL; }
ctx->type = type; ctx->type = type;
if (ctx->type == TFile) { if (ctx->type == TFile) {
tfInit();
// ugly code, refactor later // ugly code, refactor later
ctx->file.readOnly = readOnly;
if (readOnly == false) { if (readOnly == false) {
ctx->fd = tfOpenCreateWriteAppend(tmpFile); ctx->file.fd = tfOpenCreateWriteAppend(tmpFile);
} else { } else {
ctx->fd = tfOpenReadWrite(tmpFile); ctx->file.fd = tfOpenReadWrite(tmpFile);
} }
if (ctx->fd < 0) { if (ctx->file.fd < 0) {
indexError("open file error %d", errno); indexError("open file error %d", errno);
} }
} else if (ctx->type == TMemory) { } else if (ctx->type == TMemory) {
ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t)); ctx->mem.buf = calloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity;
} }
ctx->write = writeCtxDoWrite; ctx->write = writeCtxDoWrite;
ctx->read = writeCtxDoRead; ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush; ctx->flush = writeCtxDoFlush;
ctx->offset = 0; ctx->offset = 0;
ctx->limit = DefaultMem; ctx->limit = capacity;
return ctx; return ctx;
} }
void writerCtxDestroy(WriterCtx *ctx) { void writerCtxDestroy(WriterCtx *ctx) {
if (ctx->type == TMemory) { if (ctx->type == TMemory) {
free(ctx->mem); free(ctx->mem.buf);
} else { } else {
tfClose(ctx->fd); tfClose(ctx->file.fd);
tfCleanup();
} }
free(ctx); free(ctx);
} }
FstCountingWriter *fstCountingWriterCreate(void *wrt, bool readOnly) { FstCountingWriter *fstCountingWriterCreate(void *wrt) {
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
if (cw == NULL) { return NULL; } if (cw == NULL) { return NULL; }
cw->wrt = (void *)(writerCtxCreate(TFile, readOnly)); cw->wrt = wrt;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw; return cw;
} }
void fstCountingWriterDestroy(FstCountingWriter *cw) { void fstCountingWriterDestroy(FstCountingWriter *cw) {
// free wrt object: close fd or free mem // free wrt object: close fd or free mem
fstCountingWriterFlush(cw); fstCountingWriterFlush(cw);
writerCtxDestroy((WriterCtx *)(cw->wrt)); //writerCtxDestroy((WriterCtx *)(cw->wrt));
free(cw); free(cw);
} }
...@@ -124,6 +125,7 @@ int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) ...@@ -124,6 +125,7 @@ int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len)
} }
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
return 0; return 0;
} }
int fstCountingWriterFlush(FstCountingWriter *write) { int fstCountingWriterFlush(FstCountingWriter *write) {
......
...@@ -14,6 +14,50 @@ ...@@ -14,6 +14,50 @@
*/ */
#include "index_tfile.h" #include "index_tfile.h"
#include "index_fst.h"
#include "index_util.h"
static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, version);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}
TFileCache *tfileCacheCreate() {
TFileCache *tcache = calloc(1, sizeof(TFileCache));
if (tcache == NULL) { return NULL; }
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64;
return tcache;
}
void tfileCacheDestroy(TFileCache *tcache) {
free(tcache);
}
TFileReader *tfileCacheGet(TFileCache *tcache, TFileCacheKey *key) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
TFileReader *reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
return reader;
}
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void *));
return;
}
IndexTFile *indexTFileCreate() { IndexTFile *indexTFileCreate() {
IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
...@@ -22,10 +66,24 @@ IndexTFile *indexTFileCreate() { ...@@ -22,10 +66,24 @@ IndexTFile *indexTFileCreate() {
void IndexTFileDestroy(IndexTFile *tfile) { void IndexTFileDestroy(IndexTFile *tfile) {
free(tfile); free(tfile);
} }
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
IndexTFile *ptfile = (IndexTFile *)tfile; IndexTFile *ptfile = (IndexTFile *)tfile;
return 0; return 0;
} }
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) {
TFileWriterOpt wOpt = {.suid = term->suid,
.colType = term->colType,
.colName = term->colName,
.nColName= term->nColName,
.version = 1};
return 0;
}
...@@ -45,7 +45,7 @@ class FstWriter { ...@@ -45,7 +45,7 @@ class FstWriter {
class FstReadMemory { class FstReadMemory {
public: public:
FstReadMemory(size_t size) { FstReadMemory(size_t size) {
_w = fstCountingWriterCreate(NULL, true); _w = fstCountingWriterCreate(NULL);
_size = size; _size = size;
memset((void *)&_s, 0, sizeof(_s)); memset((void *)&_s, 0, sizeof(_s));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册