提交 4b0e8a39 编写于 作者: dengyihao's avatar dengyihao

feat: refactor index code

上级 45e9b239
......@@ -619,6 +619,8 @@ int32_t* taosGetErrno();
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "indexFstAutomation.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstNode.h"
#include "indexFstRegistry.h"
#include "indexFstUtil.h"
......@@ -90,8 +90,8 @@ FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes);
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out);
typedef struct FstBuilder {
FstCountingWriter* wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
IdxFstFile* wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
FstRegistry* registry; // A map of finished nodes.
FstSlice last; // The last word added
CompiledAddr lastAddr; // The address of the last compiled node
......@@ -125,9 +125,9 @@ FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr);
FstState fstStateCreate(State state);
// compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn);
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node);
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn);
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node);
// set_comm_input
void fstStateSetCommInput(FstState* state, uint8_t inp);
......@@ -282,7 +282,7 @@ FStmSt* stmBuilderIntoStm(FStmBuilder* sb);
bool fstVerify(Fst* fst);
// refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
typedef struct StreamState {
FstNode* node;
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_FST_COUNTING_WRITER_H__
#define __INDEX_FST_COUNTING_WRITER_H__
#ifndef __INDEX_FST_FILE_H__
#define __INDEX_FST_FILE_H__
#include "indexInt.h"
......@@ -29,19 +29,19 @@ extern "C" {
static char tmpFile[] = "./index";
typedef enum WriterType { TMemory, TFile } WriterType;
typedef struct WriterCtx {
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct WriterCtx* ctx);
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct WriterCtx* ctx);
typedef struct IFileCtx {
int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*read)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct IFileCtx* ctx);
int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct IFileCtx* ctx);
WriterType type;
union {
struct {
TdFilePtr pFile;
bool readOnly;
char buf[256];
int size;
bool readOnly;
char buf[256];
int size;
#ifdef USE_MMAP
char* ptr;
#endif
......@@ -53,41 +53,41 @@ typedef struct WriterCtx {
};
int32_t offset;
int32_t limit;
} WriterCtx;
} IFileCtx;
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int writeCtxDoFlush(WriterCtx* ctx);
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len);
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len);
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int idxFileCtxDoFlush(IFileCtx* ctx);
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx* w, bool remove);
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void idxFileCtxDestroy(IFileCtx* w, bool remove);
typedef uint32_t CheckSummer;
typedef struct FstCountingWriter {
typedef struct IdxFstFile {
void* wrt; // wrap any writer that counts and checksum bytes written
uint64_t count;
CheckSummer summer;
} FstCountingWriter;
} IdxFstFile;
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len);
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len);
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterFlush(FstCountingWriter* write);
int idxFileFlush(IdxFstFile* write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write);
uint32_t idxFileMaskedCheckSum(IdxFstFile* write);
FstCountingWriter* fstCountingWriterCreate(void* wtr);
void fstCountingWriterDestroy(FstCountingWriter* w);
IdxFstFile* idxFileCreate(void* wtr);
void idxFileDestroy(IdxFstFile* w);
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes);
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n);
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes);
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n);
#define FST_WRITER_COUNT(writer) (writer->count)
#define FST_WRITER_COUNT(writer) (writer->count)
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
#ifdef __cplusplus
}
......
......@@ -20,12 +20,12 @@
extern "C" {
#endif
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
#define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0)
typedef struct FstTransition {
......@@ -46,7 +46,7 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt,
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
// CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2);
......
......@@ -16,7 +16,7 @@
#define __INDEX_TFILE_H__
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexInt.h"
#include "indexTfile.h"
#include "indexUtil.h"
......@@ -59,7 +59,7 @@ typedef struct TFileCache {
typedef struct TFileWriter {
FstBuilder* fb;
WriterCtx* ctx;
IFileCtx* ctx;
TFileHeader header;
uint32_t offset;
} TFileWriter;
......@@ -68,7 +68,7 @@ typedef struct TFileWriter {
typedef struct TFileReader {
T_REF_DECLARE()
Fst* fst;
WriterCtx* ctx;
IFileCtx* ctx;
TFileHeader header;
bool remove;
} TFileReader;
......@@ -103,7 +103,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
TFileReader* tfileReaderCreate(IFileCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader);
......@@ -111,7 +111,7 @@ void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw);
......
......@@ -39,7 +39,7 @@
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000LL
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000LL // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
......@@ -614,7 +614,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
return ret;
END:
if (tw != NULL) {
writerCtxDestroy(tw->ctx, true);
idxFileCtxDestroy(tw->ctx, true);
taosMemoryFree(tw);
}
return -1;
......
......@@ -19,11 +19,11 @@
#include "tchecksum.h"
#include "tcoding.h"
static void fstPackDeltaIn(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr;
fstCountingWriterPackUintIn(wrt, deltaAddr, nBytes);
idxFilePackUintIn(wrt, deltaAddr, nBytes);
}
static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
uint8_t nBytes = packDeltaSize(nodeAddr, transAddr);
fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes);
return nBytes;
......@@ -208,7 +208,7 @@ FstState fstStateCreate(State state) {
return fstStateDict[idx];
}
// compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp) {
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp) {
FstState s = fstStateCreate(OneTransNext);
fstStateSetCommInput(&s, inp);
......@@ -216,21 +216,21 @@ void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uin
uint8_t v = fstStateCommInput(&s, &null);
if (null) {
// w->write_all(&[inp])
fstCountingWriterWrite(w, &inp, 1);
idxFileWrite(w, &inp, 1);
}
fstCountingWriterWrite(w, &(s.val), 1);
idxFileWrite(w, &(s.val), 1);
// w->write_all(&[s.val])
return;
}
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn) {
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn) {
Output out = trn->out;
uint8_t outPackSize = (out == 0 ? 0 : fstCountingWriterPackUint(w, out));
uint8_t outPackSize = (out == 0 ? 0 : idxFilePackUint(w, out));
uint8_t transPackSize = fstPackDetla(w, addr, trn->addr);
PackSizes packSizes = 0;
FST_SET_OUTPUT_PACK_SIZE(packSizes, outPackSize);
FST_SET_TRANSITION_PACK_SIZE(packSizes, transPackSize);
fstCountingWriterWrite(w, (char*)&packSizes, sizeof(packSizes));
idxFileWrite(w, (char*)&packSizes, sizeof(packSizes));
FstState st = fstStateCreate(OneTrans);
......@@ -239,12 +239,12 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
bool null = false;
uint8_t inp = fstStateCommInput(&st, &null);
if (null == true) {
fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp));
idxFileWrite(w, (char*)&trn->inp, sizeof(trn->inp));
}
fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val));
idxFileWrite(w, (char*)(&(st.val)), sizeof(st.val));
return;
}
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node) {
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node) {
int32_t sz = taosArrayGetSize(node->trans);
assert(sz <= 256);
......@@ -275,11 +275,11 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
if (anyOuts) {
if (FST_BUILDER_NODE_IS_FINAL(node)) {
fstCountingWriterPackUintIn(w, node->finalOutput, oSize);
idxFilePackUintIn(w, node->finalOutput, oSize);
}
for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterPackUintIn(w, t->out, oSize);
idxFilePackUintIn(w, t->out, oSize);
}
}
for (int32_t i = sz - 1; i >= 0; i--) {
......@@ -288,7 +288,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
}
for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterWrite(w, (char*)&t->inp, 1);
idxFileWrite(w, (char*)&t->inp, 1);
// fstPackDeltaIn(w, addr, t->addr, tSize);
}
if (sz > TRANS_INDEX_THRESHOLD) {
......@@ -306,10 +306,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
index[t->inp] = i;
// fstPackDeltaIn(w, addr, t->addr, tSize);
}
fstCountingWriterWrite(w, (char*)index, 256);
idxFileWrite(w, (char*)index, 256);
taosMemoryFree(index);
}
fstCountingWriterWrite(w, (char*)&packSizes, 1);
idxFileWrite(w, (char*)&packSizes, 1);
bool null = false;
fstStateStateNtrans(&st, &null);
if (null == true) {
......@@ -318,12 +318,12 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
// encoded in the state byte.
uint8_t v = 1;
if (sz == 256) {
fstCountingWriterWrite(w, (char*)&v, 1);
idxFileWrite(w, (char*)&v, 1);
} else {
fstCountingWriterWrite(w, (char*)&sz, 1);
idxFileWrite(w, (char*)&sz, 1);
}
}
fstCountingWriterWrite(w, (char*)(&(st.val)), 1);
idxFileWrite(w, (char*)(&(st.val)), 1);
return;
}
......@@ -753,7 +753,7 @@ bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr
return true;
}
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b);
}
......@@ -763,7 +763,7 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
return b;
}
b->wrt = fstCountingWriterCreate(w);
b->wrt = idxFileCreate(w);
b->unfinished = fstUnFinishedNodesCreate();
b->registry = fstRegistryCreate(10000, 2);
b->last = fstSliceCreate(NULL, 0);
......@@ -773,12 +773,12 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
char buf64[8] = {0};
void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, VERSION);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64;
memset(buf64, 0, sizeof(buf64));
taosEncodeFixedU64(&pBuf64, ty);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
return b;
}
......@@ -787,7 +787,7 @@ void fstBuilderDestroy(FstBuilder* b) {
return;
}
fstCountingWriterDestroy(b->wrt);
idxFileDestroy(b->wrt);
fstUnFinishedNodesDestroy(b->unfinished);
fstRegistryDestroy(b->registry);
fstSliceDestroy(&b->last);
......@@ -905,21 +905,19 @@ void* fstBuilderInsertInner(FstBuilder* b) {
void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, b->len);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, rootAddr);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
idxFileWrite(b->wrt, buf64, sizeof(buf64));
char buf32[4] = {0};
void* pBuf32 = buf32;
uint32_t sum = fstCountingWriterMaskedCheckSum(b->wrt);
uint32_t sum = idxFileMaskedCheckSum(b->wrt);
taosEncodeFixedU32(&pBuf32, sum);
fstCountingWriterWrite(b->wrt, buf32, sizeof(buf32));
idxFileWrite(b->wrt, buf32, sizeof(buf32));
fstCountingWriterFlush(b->wrt);
// fstCountingWriterDestroy(b->wrt);
// b->wrt = NULL;
idxFileFlush(b->wrt);
return b->wrt;
}
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "os.h"
#include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
if (ctx->type == TFile) {
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
} else {
......@@ -28,7 +28,7 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
ctx->offset += len;
return len;
}
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
int nRead = 0;
if (ctx->type == TFile) {
#ifdef USE_MMAP
......@@ -44,7 +44,7 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
return nRead;
}
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
int nRead = 0;
if (ctx->type == TFile) {
// tfLseek(ctx->file.pFile, offset, 0);
......@@ -61,7 +61,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
}
return nRead;
}
static int writeCtxGetSize(WriterCtx* ctx) {
static int writeCtxGetSize(IFileCtx* ctx) {
if (ctx->type == TFile) {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
......@@ -69,7 +69,7 @@ static int writeCtxGetSize(WriterCtx* ctx) {
}
return 0;
}
static int writeCtxDoFlush(WriterCtx* ctx) {
static int idxFileCtxDoFlush(IFileCtx* ctx) {
if (ctx->type == TFile) {
// taosFsyncFile(ctx->file.pFile);
taosFsyncFile(ctx->file.pFile);
......@@ -80,8 +80,8 @@ static int writeCtxDoFlush(WriterCtx* ctx) {
return 1;
}
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
WriterCtx* ctx = taosMemoryCalloc(1, sizeof(WriterCtx));
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
if (ctx == NULL) {
return NULL;
}
......@@ -91,7 +91,6 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
// ugly code, refactor later
ctx->file.readOnly = readOnly;
if (readOnly == false) {
// ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosFtruncateFile(ctx->file.pFile, 0);
int64_t file_size;
......@@ -118,10 +117,10 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity;
}
ctx->write = writeCtxDoWrite;
ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush;
ctx->readFrom = writeCtxDoReadFrom;
ctx->write = idxFileCtxDoWrite;
ctx->read = idxFileCtxDoRead;
ctx->flush = idxFileCtxDoFlush;
ctx->readFrom = idxFileCtxDoReadFrom;
ctx->size = writeCtxGetSize;
ctx->offset = 0;
......@@ -135,7 +134,7 @@ END:
taosMemoryFree(ctx);
return NULL;
}
void writerCtxDestroy(WriterCtx* ctx, bool remove) {
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
if (ctx->type == TMemory) {
taosMemoryFree(ctx->mem.buf);
} else {
......@@ -149,9 +148,6 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->file.readOnly == false) {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
// struct stat fstat;
// stat(ctx->file.buf, &fstat);
// indexError("write file size: %d", (int)(fstat.st_size));
}
if (remove) {
unlink(ctx->file.buf);
......@@ -160,30 +156,29 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
taosMemoryFree(ctx);
}
FstCountingWriter* fstCountingWriterCreate(void* wrt) {
FstCountingWriter* cw = taosMemoryCalloc(1, sizeof(FstCountingWriter));
IdxFstFile* idxFileCreate(void* wrt) {
IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
if (cw == NULL) {
return NULL;
}
cw->wrt = wrt;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw;
}
void fstCountingWriterDestroy(FstCountingWriter* cw) {
void idxFileDestroy(IdxFstFile* cw) {
// free wrt object: close fd or free mem
fstCountingWriterFlush(cw);
// writerCtxDestroy((WriterCtx *)(cw->wrt));
idxFileFlush(cw);
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
taosMemoryFree(cw);
}
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}
// update checksum
// write data to file/socket or mem
WriterCtx* ctx = write->wrt;
IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len);
......@@ -192,42 +187,41 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
write->summer = taosCalcChecksum(write->summer, buf, len);
return len;
}
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}
WriterCtx* ctx = write->wrt;
int nRead = ctx->read(ctx, buf, len);
IFileCtx* ctx = write->wrt;
int nRead = ctx->read(ctx, buf, len);
// assert(nRead == len);
return nRead;
}
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
// opt
return write->summer;
}
int fstCountingWriterFlush(FstCountingWriter* write) {
WriterCtx* ctx = write->wrt;
int idxFileFlush(IdxFstFile* write) {
IFileCtx* ctx = write->wrt;
ctx->flush(ctx);
// write->wtr->flush
return 1;
}
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) {
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
assert(1 <= nBytes && nBytes <= 8);
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
for (uint8_t i = 0; i < nBytes; i++) {
buf[i] = (uint8_t)n;
n = n >> 8;
}
fstCountingWriterWrite(writer, buf, nBytes);
idxFileWrite(writer, buf, nBytes);
taosMemoryFree(buf);
return;
}
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) {
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
uint8_t nBytes = packSize(n);
fstCountingWriterPackUintIn(writer, n, nBytes);
idxFilePackUintIn(writer, n, nBytes);
return nBytes;
}
......@@ -95,7 +95,7 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
}
}
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
// startAddr) {
// size_t sz = taosArrayGetSize(b->trans);
......
......@@ -16,7 +16,7 @@
#include "index.h"
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstFile.h"
#include "indexUtil.h"
#include "taosdef.h"
#include "taoserror.h"
......@@ -103,7 +103,7 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i);
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
IFileCtx* wc = idxFileCtxCreate(TFile, file, true, 1024 * 1024 * 64);
if (wc == NULL) {
indexError("failed to open index:%s", file);
goto End;
......@@ -175,7 +175,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
tfileReaderRef(reader);
return;
}
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
if (reader == NULL) {
return NULL;
......@@ -216,7 +216,7 @@ void tfileReaderDestroy(TFileReader* reader) {
} else {
indexInfo("%s is not removed", reader->ctx->file.buf);
}
writerCtxDestroy(reader->ctx, reader->remove);
idxFileCtxDestroy(reader->ctx, reader->remove);
taosMemoryFree(reader);
}
......@@ -490,7 +490,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
IFileCtx* wcx = idxFileCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) {
return NULL;
}
......@@ -507,7 +507,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
IFileCtx* wc = idxFileCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
......@@ -518,7 +518,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c
TFileReader* reader = tfileReaderCreate(wc);
return reader;
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
......@@ -609,14 +609,14 @@ void tfileWriterClose(TFileWriter* tw) {
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx, false);
idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw);
}
void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) {
return;
}
writerCtxDestroy(tw->ctx, false);
idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw);
}
......@@ -892,8 +892,8 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
return 0;
}
static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx;
int size = ctx->size(ctx);
IFileCtx* ctx = reader->ctx;
int size = ctx->size(ctx);
// current load fst into memory, refactor it later
int fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber);
......@@ -919,7 +919,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
}
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
// TODO(yihao): opt later
WriterCtx* ctx = reader->ctx;
IFileCtx* ctx = reader->ctx;
// add block cache
char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
......@@ -952,7 +952,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
}
static int tfileReaderVerify(TFileReader* reader) {
// just validate header and Footer, file corrupted also shuild be verified later
WriterCtx* ctx = reader->ctx;
IFileCtx* ctx = reader->ctx;
uint64_t tMagicNumber = 0;
......
......@@ -7,7 +7,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -20,7 +19,7 @@ class FstWriter {
public:
FstWriter() {
taosRemoveFile(fileName.c_str());
_wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -38,25 +37,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -141,18 +140,18 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstCountingWriter* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
int32_t _size;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
IFileCtx* _wc;
int32_t _size;
};
#define L 100
......
......@@ -8,7 +8,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -40,7 +39,7 @@ static void EnvCleanup() {}
class FstWriter {
public:
FstWriter() {
_wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -58,25 +57,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, tindex, true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, tindex, true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -130,18 +129,18 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstCountingWriter* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
size_t _size;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
IFileCtx* _wc;
size_t _size;
};
class FstWriterEnv : public ::testing::Test {
......
......@@ -20,7 +20,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......@@ -51,7 +50,7 @@ class DebugInfo {
class FstWriter {
public:
FstWriter() {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
_b = fstBuilderCreate(NULL, 0);
}
bool Put(const std::string& key, uint64_t val) {
......@@ -64,25 +63,25 @@ class FstWriter {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false);
idxFileCtxDestroy(_wc, false);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
IFileCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
_w = idxFileCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) {
return false;
}
......@@ -124,18 +123,18 @@ class FstReadMemory {
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
idxFileDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc, true);
idxFileCtxDestroy(_wc, true);
}
private:
FstCountingWriter* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
size_t _size;
IdxFstFile* _w;
Fst* _fst;
FstSlice _s;
IFileCtx* _wc;
size_t _size;
};
#define L 100
......@@ -392,13 +391,13 @@ class TFileObj {
fileName_ = path;
WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
IFileCtx* ctx = idxFileCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
writer_ = tfileWriterCreate(ctx, &header);
return writer_ != NULL ? true : false;
}
bool InitReader() {
WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
IFileCtx* ctx = idxFileCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
reader_ = tfileReaderCreate(ctx);
return reader_ != NULL ? true : false;
}
......
......@@ -7,7 +7,6 @@
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......
......@@ -8,7 +8,6 @@
#include "indexCache.h"
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
......
......@@ -592,6 +592,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册