未验证 提交 8e71d539 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #14404 from taosdata/feat/idxRefact

feat: refactor idx code
......@@ -627,6 +627,8 @@ int32_t* taosGetErrno();
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "indexFstAutomation.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstNode.h"
#include "indexFstRegistry.h"
#include "indexFstUtil.h"
......@@ -90,7 +90,7 @@ FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes);
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out);
typedef struct FstBuilder {
FstCountingWriter* wrt; // The FST raw data is written directly to `wtr`.
IdxFstFile* wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
FstRegistry* registry; // A map of finished nodes.
FstSlice last; // The last word added
......@@ -125,9 +125,9 @@ FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr);
FstState fstStateCreate(State state);
// compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn);
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node);
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn);
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node);
// set_comm_input
void fstStateSetCommInput(FstState* state, uint8_t inp);
......@@ -282,7 +282,7 @@ FStmSt* stmBuilderIntoStm(FStmBuilder* sb);
bool fstVerify(Fst* fst);
// refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
typedef struct StreamState {
FstNode* node;
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_FST_COUNTING_WRITER_H__
#define __INDEX_FST_COUNTING_WRITER_H__
#ifndef __INDEX_FST_FILE_H__
#define __INDEX_FST_FILE_H__
#include "indexInt.h"
......@@ -29,61 +29,61 @@ extern "C" {
static char tmpFile[] = "./index";
typedef enum WriterType { TMemory, TFile } WriterType;
typedef struct WriterCtx {
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct WriterCtx* ctx);
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct WriterCtx* ctx);
typedef struct IFileCtx {
int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*read)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct IFileCtx* ctx);
int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct IFileCtx* ctx);
WriterType type;
union {
struct {
TdFilePtr pFile;
bool readOnly;
char buf[256];
int size;
int64_t size;
#ifdef USE_MMAP
char* ptr;
#endif
} file;
struct {
int32_t capa;
int32_t cap;
char* buf;
} mem;
};
int32_t offset;
int32_t limit;
} WriterCtx;
} IFileCtx;
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int writeCtxDoFlush(WriterCtx* ctx);
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len);
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len);
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int idxFileCtxDoFlush(IFileCtx* ctx);
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx* w, bool remove);
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void idxFileCtxDestroy(IFileCtx* w, bool remove);
typedef uint32_t CheckSummer;
typedef struct FstCountingWriter {
typedef struct IdxFstFile {
void* wrt; // wrap any writer that counts and checksum bytes written
uint64_t count;
CheckSummer summer;
} FstCountingWriter;
} IdxFstFile;
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len);
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len);
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterFlush(FstCountingWriter* write);
int idxFileFlush(IdxFstFile* write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write);
uint32_t idxFileMaskedCheckSum(IdxFstFile* write);
FstCountingWriter* fstCountingWriterCreate(void* wtr);
void fstCountingWriterDestroy(FstCountingWriter* w);
IdxFstFile* idxFileCreate(void* wtr);
void idxFileDestroy(IdxFstFile* w);
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes);
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n);
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes);
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n);
#define FST_WRITER_COUNT(writer) (writer->count)
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstUtil.h"
#include "indexInt.h"
......@@ -46,7 +46,7 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt,
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
// CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2);
......
......@@ -16,7 +16,7 @@
#define __INDEX_TFILE_H__
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexInt.h"
#include "indexTfile.h"
#include "indexUtil.h"
......@@ -59,7 +59,7 @@ typedef struct TFileCache {
typedef struct TFileWriter {
FstBuilder* fb;
WriterCtx* ctx;
IFileCtx* ctx;
TFileHeader header;
uint32_t offset;
} TFileWriter;
......@@ -68,7 +68,7 @@ typedef struct TFileWriter {
typedef struct TFileReader {
T_REF_DECLARE()
Fst* fst;
WriterCtx* ctx;
IFileCtx* ctx;
TFileHeader header;
bool remove;
} TFileReader;
......@@ -103,7 +103,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
TFileReader* tfileReaderCreate(IFileCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader);
......@@ -111,7 +111,7 @@ void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw);
......
......@@ -614,7 +614,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
return ret;
END:
if (tw != NULL) {
writerCtxDestroy(tw->ctx, true);
idxFileCtxDestroy(tw->ctx, true);
taosMemoryFree(tw);
}
return -1;
......
......@@ -19,11 +19,11 @@
#include "tchecksum.h"
#include "tcoding.h"
static void fstPackDeltaIn(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr;
fstCountingWriterPackUintIn(wrt, deltaAddr, nBytes);
idxFilePackUintIn(wrt, deltaAddr, nBytes);
}
static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
uint8_t nBytes = packDeltaSize(nodeAddr, transAddr);
fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes);
return nBytes;
......@@ -208,7 +208,7 @@ FstState fstStateCreate(State state) {
return fstStateDict[idx];
}
// compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp) {
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp) {
FstState s = fstStateCreate(OneTransNext);
fstStateSetCommInput(&s, inp);
......@@ -216,21 +216,21 @@ void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uin
uint8_t v = fstStateCommInput(&s, &null);
if (null) {
// w->write_all(&[inp])
fstCountingWriterWrite(w, &inp, 1);
idxFileWrite(w, &inp, 1);
}
fstCountingWriterWrite(w, &(s.val), 1);
idxFileWrite(w, &(s.val), 1);
// w->write_all(&[s.val])
return;
}
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn) {
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn) {
Output out = trn->out;
uint8_t outPackSize = (out == 0 ? 0 : fstCountingWriterPackUint(w, out));
uint8_t outPackSize = (out == 0 ? 0 : idxFilePackUint(w, out));
uint8_t transPackSize = fstPackDetla(w, addr, trn->addr);
PackSizes packSizes = 0;
FST_SET_OUTPUT_PACK_SIZE(packSizes, outPackSize);
FST_SET_TRANSITION_PACK_SIZE(packSizes, transPackSize);
fstCountingWriterWrite(w, (char*)&packSizes, sizeof(packSizes));
idxFileWrite(w, (char*)&packSizes, sizeof(packSizes));
FstState st = fstStateCreate(OneTrans);
......@@ -239,12 +239,12 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
bool null = false;
uint8_t inp = fstStateCommInput(&st, &null);
if (null == true) {
fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp));
idxFileWrite(w, (char*)&trn->inp, sizeof(trn->inp));
}
fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val));
idxFileWrite(w, (char*)(&(st.val)), sizeof(st.val));
return;
}
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node) {
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node) {
int32_t sz = taosArrayGetSize(node->trans);
assert(sz <= 256);
......@@ -275,11 +275,11 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
if (anyOuts) {
if (FST_BUILDER_NODE_IS_FINAL(node)) {
fstCountingWriterPackUintIn(w, node->finalOutput, oSize);
idxFilePackUintIn(w, node->finalOutput, oSize);
}
for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterPackUintIn(w, t->out, oSize);
idxFilePackUintIn(w, t->out, oSize);
}
}
for (int32_t i = sz - 1; i >= 0; i--) {
......@@ -288,7 +288,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
}
for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterWrite(w, (char*)&t->inp, 1);
idxFileWrite(w, (char*)&t->inp, 1);
// fstPackDeltaIn(w, addr, t->addr, tSize);
}
if (sz > TRANS_INDEX_THRESHOLD) {
......@@ -306,10 +306,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
index[t->inp] = i;
// fstPackDeltaIn(w, addr, t->addr, tSize);
}
fstCountingWriterWrite(w, (char*)index, 256);
idxFileWrite(w, (char*)index, 256);
taosMemoryFree(index);
}
fstCountingWriterWrite(w, (char*)&packSizes, 1);
idxFileWrite(w, (char*)&packSizes, 1);
bool null = false;
fstStateStateNtrans(&st, &null);
if (null == true) {
......@@ -318,12 +318,12 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
// encoded in the state byte.
uint8_t v = 1;
if (sz == 256) {
fstCountingWriterWrite(w, (char*)&v, 1);
idxFileWrite(w, (char*)&v, 1);
} else {
fstCountingWriterWrite(w, (char*)&sz, 1);
idxFileWrite(w, (char*)&sz, 1);
}
}
fstCountingWriterWrite(w, (char*)(&(st.val)), 1);
idxFileWrite(w, (char*)(&(st.val)), 1);
return;
}
......@@ -753,7 +753,7 @@ bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr
return true;
}
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b);
}
......@@ -763,7 +763,7 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
return b;
}
b->wrt = fstCountingWriterCreate(w);
b->wrt = idxFileCreate(w);
b->unfinished = fstUnFinishedNodesCreate();
b->registry = fstRegistryCreate(10000, 2);
b->last = fstSliceCreate(NULL, 0);
......@@ -773,12 +773,12 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
char buf64[8] = {0};
void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, VERSION);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64;
memset(buf64, 0, sizeof(buf64));
taosEncodeFixedU64(&pBuf64, ty);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
return b;
}
......@@ -787,7 +787,7 @@ void fstBuilderDestroy(FstBuilder* b) {
return;
}
fstCountingWriterDestroy(b->wrt);
idxFileDestroy(b->wrt);
fstUnFinishedNodesDestroy(b->unfinished);
fstRegistryDestroy(b->registry);
fstSliceDestroy(&b->last);
......@@ -905,21 +905,19 @@ void* fstBuilderInsertInner(FstBuilder* b) {
void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, b->len);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, rootAddr);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
char buf32[4] = {0};
void* pBuf32 = buf32;
uint32_t sum = fstCountingWriterMaskedCheckSum(b->wrt);
uint32_t sum = idxFileMaskedCheckSum(b->wrt);
taosEncodeFixedU32(&pBuf32, sum);
fstCountingWriterWrite(b->wrt, buf32, sizeof(buf32));
idxFileWrite(b->wrt, buf32, sizeof(buf32));
fstCountingWriterFlush(b->wrt);
// fstCountingWriterDestroy(b->wrt);
// b->wrt = NULL;
idxFileFlush(b->wrt);
return b->wrt;
}
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
......
......@@ -61,9 +61,10 @@ void dfaBuilderDestroy(FstDfaBuilder *builder) {
pIter = taosHashIterate(builder->cache, pIter);
}
taosHashCleanup(builder->cache);
taosMemoryFree(builder);
}
FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
FstDfa *dfaBuilder(FstDfaBuilder *builder) {
uint32_t sz = taosArrayGetSize(builder->dfa->insts);
FstSparseSet *cur = sparSetCreate(sz);
FstSparseSet *nxt = sparSetCreate(sz);
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "os.h"
#include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
if (ctx->type == TFile) {
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
} else {
......@@ -28,7 +28,7 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
ctx->offset += len;
return len;
}
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
int nRead = 0;
if (ctx->type == TFile) {
#ifdef USE_MMAP
......@@ -44,7 +44,7 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
return nRead;
}
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
int nRead = 0;
if (ctx->type == TFile) {
// tfLseek(ctx->file.pFile, offset, 0);
......@@ -61,7 +61,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
}
return nRead;
}
static int writeCtxGetSize(WriterCtx* ctx) {
static int idxFileCtxGetSize(IFileCtx* ctx) {
if (ctx->type == TFile) {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
......@@ -69,7 +69,7 @@ static int writeCtxGetSize(WriterCtx* ctx) {
}
return 0;
}
static int writeCtxDoFlush(WriterCtx* ctx) {
static int idxFileCtxDoFlush(IFileCtx* ctx) {
if (ctx->type == TFile) {
// taosFsyncFile(ctx->file.pFile);
taosFsyncFile(ctx->file.pFile);
......@@ -80,8 +80,8 @@ static int writeCtxDoFlush(WriterCtx* ctx) {
return 1;
}
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
WriterCtx* ctx = taosMemoryCalloc(1, sizeof(WriterCtx));
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
if (ctx == NULL) {
return NULL;
}
......@@ -90,39 +90,36 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (ctx->type == TFile) {
// ugly code, refactor later
ctx->file.readOnly = readOnly;
memcpy(ctx->file.buf, path, strlen(path));
if (readOnly == false) {
// ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosFtruncateFile(ctx->file.pFile, 0);
int64_t file_size;
taosStatFile(path, &file_size, NULL);
ctx->file.size = (int)file_size;
taosStatFile(path, &ctx->file.size, NULL);
// ctx->file.size = (int)size;
} else {
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
int64_t file_size = 0;
taosFStatFile(ctx->file.pFile, &file_size, NULL);
ctx->file.size = (int)file_size;
int64_t size = 0;
taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
ctx->file.size = (int)size;
#ifdef USE_MMAP
ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
#endif
}
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.pFile == NULL) {
indexError("failed to open file, error %d", errno);
goto END;
}
} else if (ctx->type == TMemory) {
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity;
ctx->mem.cap = capacity;
}
ctx->write = writeCtxDoWrite;
ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush;
ctx->readFrom = writeCtxDoReadFrom;
ctx->size = writeCtxGetSize;
ctx->write = idxFileCtxDoWrite;
ctx->read = idxFileCtxDoRead;
ctx->flush = idxFileCtxDoFlush;
ctx->readFrom = idxFileCtxDoReadFrom;
ctx->size = idxFileCtxGetSize;
ctx->offset = 0;
ctx->limit = capacity;
......@@ -135,7 +132,7 @@ END:
taosMemoryFree(ctx);
return NULL;
}
void writerCtxDestroy(WriterCtx* ctx, bool remove) {
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
if (ctx->type == TMemory) {
taosMemoryFree(ctx->mem.buf);
} else {
......@@ -149,9 +146,6 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->file.readOnly == false) {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
// struct stat fstat;
// stat(ctx->file.buf, &fstat);
// indexError("write file size: %d", (int)(fstat.st_size));
}
if (remove) {
unlink(ctx->file.buf);
......@@ -160,30 +154,29 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
taosMemoryFree(ctx);
}
FstCountingWriter* fstCountingWriterCreate(void* wrt) {
FstCountingWriter* cw = taosMemoryCalloc(1, sizeof(FstCountingWriter));
IdxFstFile* idxFileCreate(void* wrt) {
IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
if (cw == NULL) {
return NULL;
}
cw->wrt = wrt;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw;
}
void fstCountingWriterDestroy(FstCountingWriter* cw) {
void idxFileDestroy(IdxFstFile* cw) {
// free wrt object: close fd or free mem
fstCountingWriterFlush(cw);
// writerCtxDestroy((WriterCtx *)(cw->wrt));
idxFileFlush(cw);
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
taosMemoryFree(cw);
}
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}
// update checksum
// write data to file/socket or mem
WriterCtx* ctx = write->wrt;
IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len);
......@@ -192,42 +185,41 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
write->summer = taosCalcChecksum(write->summer, buf, len);
return len;
}
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}
WriterCtx* ctx = write->wrt;
IFileCtx* ctx = write->wrt;
int nRead = ctx->read(ctx, buf, len);
// assert(nRead == len);
return nRead;
}
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
// opt
return write->summer;
}
int fstCountingWriterFlush(FstCountingWriter* write) {
WriterCtx* ctx = write->wrt;
int idxFileFlush(IdxFstFile* write) {
IFileCtx* ctx = write->wrt;
ctx->flush(ctx);
// write->wtr->flush
return 1;
}
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) {
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
assert(1 <= nBytes && nBytes <= 8);
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
for (uint8_t i = 0; i < nBytes; i++) {
buf[i] = (uint8_t)n;
n = n >> 8;
}
fstCountingWriterWrite(writer, buf, nBytes);
idxFileWrite(writer, buf, nBytes);
taosMemoryFree(buf);
return;
}
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) {
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
uint8_t nBytes = packSize(n);
fstCountingWriterPackUintIn(writer, n, nBytes);
idxFilePackUintIn(writer, n, nBytes);
return nBytes;
}
......@@ -95,7 +95,7 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
}
}
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
// startAddr) {
// size_t sz = taosArrayGetSize(b->trans);
......
......@@ -75,7 +75,6 @@ CompiledAddr unpackDelta(char* data, uint64_t len, uint64_t nodeAddr) {
}
// fst slice func
//
FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString));
......@@ -164,16 +163,3 @@ int fstSliceCompare(FstSlice* a, FstSlice* b) {
return 0;
}
}
// FstStack* fstStackCreate(size_t elemSize, StackFreeElem freeFn) {
// FstStack *s = taosMemoryCalloc(1, sizeof(FstStack));
// if (s == NULL) { return NULL; }
// s->
// s->freeFn
//
//}
// void *fstStackPush(FstStack *s, void *elem);
// void *fstStackTop(FstStack *s);
// size_t fstStackLen(FstStack *s);
// void *fstStackGetAt(FstStack *s, size_t i);
// void fstStackDestory(FstStack *);
......@@ -16,7 +16,7 @@
#include "index.h"
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexUtil.h"
#include "taosdef.h"
#include "taoserror.h"
......@@ -103,7 +103,7 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i);
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
IFileCtx* wc = idxFileCtxCreate(TFile, file, true, 1024 * 1024 * 64);
if (wc == NULL) {
indexError("failed to open index:%s", file);
goto End;
......@@ -175,7 +175,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
tfileReaderRef(reader);
return;
}
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
if (reader == NULL) {
return NULL;
......@@ -216,7 +216,7 @@ void tfileReaderDestroy(TFileReader* reader) {
} else {
indexInfo("%s is not removed", reader->ctx->file.buf);
}
writerCtxDestroy(reader->ctx, reader->remove);
idxFileCtxDestroy(reader->ctx, reader->remove);
taosMemoryFree(reader);
}
......@@ -490,7 +490,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
IFileCtx* wcx = idxFileCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) {
return NULL;
}
......@@ -507,18 +507,18 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
IFileCtx* wc = idxFileCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL;
}
indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc);
return reader;
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
......@@ -609,14 +609,14 @@ void tfileWriterClose(TFileWriter* tw) {
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx, false);
idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw);
}
void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx, false);
idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw);
}
......@@ -892,7 +892,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
return 0;
}
static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx;
IFileCtx* ctx = reader->ctx;
int size = ctx->size(ctx);
// current load fst into memory, refactor it later
......@@ -905,8 +905,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t ts = taosGetTimestampUs();
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
"us",
nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize);
......@@ -919,7 +920,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
}
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
// TODO(yihao): opt later
WriterCtx* ctx = reader->ctx;
IFileCtx* ctx = reader->ctx;
// add block cache
char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
......@@ -952,7 +953,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
}
static int tfileReaderVerify(TFileReader* reader) {
// just validate header and Footer, file corrupted also shuild be verified later
WriterCtx* ctx = reader->ctx;
IFileCtx* ctx = reader->ctx;
uint64_t tMagicNumber = 0;
......
......@@ -7,7 +7,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -20,7 +19,7 @@ class FstWriter {
public:
FstWriter() {
taosRemoveFile(fileName.c_str());
_wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -38,25 +37,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -141,17 +140,17 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstCountingWriter* _w;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
IFileCtx* _wc;
int32_t _size;
};
......
......@@ -8,7 +8,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -40,7 +39,7 @@ static void EnvCleanup() {}
class FstWriter {
public:
FstWriter() {
_wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -58,25 +57,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, tindex, true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, tindex, true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -130,17 +129,17 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstCountingWriter* _w;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
IFileCtx* _wc;
size_t _size;
};
......
......@@ -20,7 +20,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -51,7 +50,7 @@ class DebugInfo {
class FstWriter {
public:
FstWriter() {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
_b = fstBuilderCreate(NULL, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -64,25 +63,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -124,17 +123,17 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, true);
idxFileCtxDestroy(_wc, true);
}
private:
FstCountingWriter* _w;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
IFileCtx* _wc;
size_t _size;
};
......@@ -392,13 +391,13 @@ class TFileObj {
fileName_ = path;
WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
IFileCtx* ctx = idxFileCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
writer_ = tfileWriterCreate(ctx, &header);
return writer_ != NULL ? true : false;
}
bool InitReader() {
WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
IFileCtx* ctx = idxFileCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
reader_ = tfileReaderCreate(ctx);
return reader_ != NULL ? true : false;
}
......
......@@ -7,7 +7,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......
......@@ -8,7 +8,6 @@
#include "indexCache.h"
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......
......@@ -479,6 +479,10 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
return true;
}
static int32_t transGetRefMgt() {
//
return refMgt;
}
static void transInitEnv() {
refMgt = transOpenExHandleMgt(50000);
......@@ -486,8 +490,9 @@ static void transInitEnv() {
}
static void transDestroyEnv() {
// close ref
transCloseExHandleMgt(refMgt);
transCloseExHandleMgt();
}
void transInit() {
// init env
taosThreadOnce(&transModuleInit, transInitEnv);
......@@ -502,25 +507,25 @@ int32_t transOpenExHandleMgt(int size) {
}
void transCloseExHandleMgt() {
// close ref
taosCloseRef(refMgt);
taosCloseRef(transGetRefMgt());
}
int64_t transAddExHandle(void* p) {
// acquire extern handle
return taosAddRef(refMgt, p);
return taosAddRef(transGetRefMgt(), p);
}
int32_t transRemoveExHandle(int64_t refId) {
// acquire extern handle
return taosRemoveRef(refMgt, refId);
return taosRemoveRef(transGetRefMgt(), refId);
}
SExHandle* transAcquireExHandle(int64_t refId) {
// acquire extern handle
return (SExHandle*)taosAcquireRef(refMgt, refId);
return (SExHandle*)taosAcquireRef(transGetRefMgt(), refId);
}
int32_t transReleaseExHandle(int64_t refId) {
// release extern handle
return taosReleaseRef(refMgt, refId);
return taosReleaseRef(transGetRefMgt(), refId);
}
void transDestoryExHandle(void* handle) {
if (handle == NULL) {
......
......@@ -598,6 +598,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册