提交 1b27d3e3 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

...@@ -627,6 +627,8 @@ int32_t* taosGetErrno(); ...@@ -627,6 +627,8 @@ int32_t* taosGetErrno();
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)
//tmq //tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
......
...@@ -21,7 +21,7 @@ extern "C" { ...@@ -21,7 +21,7 @@ extern "C" {
#endif #endif
#include "indexFstAutomation.h" #include "indexFstAutomation.h"
#include "indexFstCountingWriter.h" #include "indexFstFile.h"
#include "indexFstNode.h" #include "indexFstNode.h"
#include "indexFstRegistry.h" #include "indexFstRegistry.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
...@@ -90,8 +90,8 @@ FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes); ...@@ -90,8 +90,8 @@ FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes);
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out); uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out);
typedef struct FstBuilder { typedef struct FstBuilder {
FstCountingWriter* wrt; // The FST raw data is written directly to `wtr`. IdxFstFile* wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
FstRegistry* registry; // A map of finished nodes. FstRegistry* registry; // A map of finished nodes.
FstSlice last; // The last word added FstSlice last; // The last word added
CompiledAddr lastAddr; // The address of the last compiled node CompiledAddr lastAddr; // The address of the last compiled node
...@@ -125,9 +125,9 @@ FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr); ...@@ -125,9 +125,9 @@ FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr);
FstState fstStateCreate(State state); FstState fstStateCreate(State state);
// compile // compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp); void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn); void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn);
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node); void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node);
// set_comm_input // set_comm_input
void fstStateSetCommInput(FstState* state, uint8_t inp); void fstStateSetCommInput(FstState* state, uint8_t inp);
...@@ -282,7 +282,7 @@ FStmSt* stmBuilderIntoStm(FStmBuilder* sb); ...@@ -282,7 +282,7 @@ FStmSt* stmBuilderIntoStm(FStmBuilder* sb);
bool fstVerify(Fst* fst); bool fstVerify(Fst* fst);
// refactor this function // refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr); bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
typedef struct StreamState { typedef struct StreamState {
FstNode* node; FstNode* node;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __INDEX_FST_COUNTING_WRITER_H__ #ifndef __INDEX_FST_FILE_H__
#define __INDEX_FST_COUNTING_WRITER_H__ #define __INDEX_FST_FILE_H__
#include "indexInt.h" #include "indexInt.h"
...@@ -29,65 +29,65 @@ extern "C" { ...@@ -29,65 +29,65 @@ extern "C" {
static char tmpFile[] = "./index"; static char tmpFile[] = "./index";
typedef enum WriterType { TMemory, TFile } WriterType; typedef enum WriterType { TMemory, TFile } WriterType;
typedef struct WriterCtx { typedef struct IFileCtx {
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*read)(struct IFileCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct WriterCtx* ctx); int (*flush)(struct IFileCtx* ctx);
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct WriterCtx* ctx); int (*size)(struct IFileCtx* ctx);
WriterType type; WriterType type;
union { union {
struct { struct {
TdFilePtr pFile; TdFilePtr pFile;
bool readOnly; bool readOnly;
char buf[256]; char buf[256];
int size; int64_t size;
#ifdef USE_MMAP #ifdef USE_MMAP
char* ptr; char* ptr;
#endif #endif
} file; } file;
struct { struct {
int32_t capa; int32_t cap;
char* buf; char* buf;
} mem; } mem;
}; };
int32_t offset; int32_t offset;
int32_t limit; int32_t limit;
} WriterCtx; } IFileCtx;
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len); static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len); static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int writeCtxDoFlush(WriterCtx* ctx); static int idxFileCtxDoFlush(IFileCtx* ctx);
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity); IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx* w, bool remove); void idxFileCtxDestroy(IFileCtx* w, bool remove);
typedef uint32_t CheckSummer; typedef uint32_t CheckSummer;
typedef struct FstCountingWriter { typedef struct IdxFstFile {
void* wrt; // wrap any writer that counts and checksum bytes written void* wrt; // wrap any writer that counts and checksum bytes written
uint64_t count; uint64_t count;
CheckSummer summer; CheckSummer summer;
} FstCountingWriter; } IdxFstFile;
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len); int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len); int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int fstCountingWriterFlush(FstCountingWriter* write); int idxFileFlush(IdxFstFile* write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write); uint32_t idxFileMaskedCheckSum(IdxFstFile* write);
FstCountingWriter* fstCountingWriterCreate(void* wtr); IdxFstFile* idxFileCreate(void* wtr);
void fstCountingWriterDestroy(FstCountingWriter* w); void idxFileDestroy(IdxFstFile* w);
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes); void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes);
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n); uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n);
#define FST_WRITER_COUNT(writer) (writer->count) #define FST_WRITER_COUNT(writer) (writer->count)
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr) #define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer) #define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
extern "C" { extern "C" {
#endif #endif
#include "indexFstCountingWriter.h" #include "indexFstFile.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal) #define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0) #define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
#define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0) #define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0)
typedef struct FstTransition { typedef struct FstTransition {
...@@ -46,7 +46,7 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src); ...@@ -46,7 +46,7 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src); void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
// CompiledAddr lastAddr, CompiledAddr startAddr); // CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2); bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define __INDEX_TFILE_H__ #define __INDEX_TFILE_H__
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h" #include "indexFstFile.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
#include "indexUtil.h" #include "indexUtil.h"
...@@ -59,7 +59,7 @@ typedef struct TFileCache { ...@@ -59,7 +59,7 @@ typedef struct TFileCache {
typedef struct TFileWriter { typedef struct TFileWriter {
FstBuilder* fb; FstBuilder* fb;
WriterCtx* ctx; IFileCtx* ctx;
TFileHeader header; TFileHeader header;
uint32_t offset; uint32_t offset;
} TFileWriter; } TFileWriter;
...@@ -68,7 +68,7 @@ typedef struct TFileWriter { ...@@ -68,7 +68,7 @@ typedef struct TFileWriter {
typedef struct TFileReader { typedef struct TFileReader {
T_REF_DECLARE() T_REF_DECLARE()
Fst* fst; Fst* fst;
WriterCtx* ctx; IFileCtx* ctx;
TFileHeader header; TFileHeader header;
bool remove; bool remove;
} TFileReader; } TFileReader;
...@@ -103,7 +103,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read ...@@ -103,7 +103,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(IFileCtx* ctx);
void tfileReaderDestroy(TFileReader* reader); void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader); void tfileReaderRef(TFileReader* reader);
...@@ -111,7 +111,7 @@ void tfileReaderUnRef(TFileReader* reader); ...@@ -111,7 +111,7 @@ void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type); TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw); void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw); void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order); int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw); int tfileWriterFinish(TFileWriter* tw);
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000LL #define INDEX_DATA_BIGINT_NULL 0x8000000000000000LL
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL #define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN #define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000LL // an NAN #define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000LL // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF #define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF #define INDEX_DATA_BINARY_NULL 0xFF
...@@ -614,7 +614,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -614,7 +614,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
return ret; return ret;
END: END:
if (tw != NULL) { if (tw != NULL) {
writerCtxDestroy(tw->ctx, true); idxFileCtxDestroy(tw->ctx, true);
taosMemoryFree(tw); taosMemoryFree(tw);
} }
return -1; return -1;
......
...@@ -19,11 +19,11 @@ ...@@ -19,11 +19,11 @@
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
static void fstPackDeltaIn(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) { static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr; CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr;
fstCountingWriterPackUintIn(wrt, deltaAddr, nBytes); idxFilePackUintIn(wrt, deltaAddr, nBytes);
} }
static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) { static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
uint8_t nBytes = packDeltaSize(nodeAddr, transAddr); uint8_t nBytes = packDeltaSize(nodeAddr, transAddr);
fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes); fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes);
return nBytes; return nBytes;
...@@ -208,7 +208,7 @@ FstState fstStateCreate(State state) { ...@@ -208,7 +208,7 @@ FstState fstStateCreate(State state) {
return fstStateDict[idx]; return fstStateDict[idx];
} }
// compile // compile
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp) { void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp) {
FstState s = fstStateCreate(OneTransNext); FstState s = fstStateCreate(OneTransNext);
fstStateSetCommInput(&s, inp); fstStateSetCommInput(&s, inp);
...@@ -216,21 +216,21 @@ void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uin ...@@ -216,21 +216,21 @@ void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uin
uint8_t v = fstStateCommInput(&s, &null); uint8_t v = fstStateCommInput(&s, &null);
if (null) { if (null) {
// w->write_all(&[inp]) // w->write_all(&[inp])
fstCountingWriterWrite(w, &inp, 1); idxFileWrite(w, &inp, 1);
} }
fstCountingWriterWrite(w, &(s.val), 1); idxFileWrite(w, &(s.val), 1);
// w->write_all(&[s.val]) // w->write_all(&[s.val])
return; return;
} }
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn) { void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn) {
Output out = trn->out; Output out = trn->out;
uint8_t outPackSize = (out == 0 ? 0 : fstCountingWriterPackUint(w, out)); uint8_t outPackSize = (out == 0 ? 0 : idxFilePackUint(w, out));
uint8_t transPackSize = fstPackDetla(w, addr, trn->addr); uint8_t transPackSize = fstPackDetla(w, addr, trn->addr);
PackSizes packSizes = 0; PackSizes packSizes = 0;
FST_SET_OUTPUT_PACK_SIZE(packSizes, outPackSize); FST_SET_OUTPUT_PACK_SIZE(packSizes, outPackSize);
FST_SET_TRANSITION_PACK_SIZE(packSizes, transPackSize); FST_SET_TRANSITION_PACK_SIZE(packSizes, transPackSize);
fstCountingWriterWrite(w, (char*)&packSizes, sizeof(packSizes)); idxFileWrite(w, (char*)&packSizes, sizeof(packSizes));
FstState st = fstStateCreate(OneTrans); FstState st = fstStateCreate(OneTrans);
...@@ -239,12 +239,12 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran ...@@ -239,12 +239,12 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
bool null = false; bool null = false;
uint8_t inp = fstStateCommInput(&st, &null); uint8_t inp = fstStateCommInput(&st, &null);
if (null == true) { if (null == true) {
fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp)); idxFileWrite(w, (char*)&trn->inp, sizeof(trn->inp));
} }
fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val)); idxFileWrite(w, (char*)(&(st.val)), sizeof(st.val));
return; return;
} }
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node) { void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node) {
int32_t sz = taosArrayGetSize(node->trans); int32_t sz = taosArrayGetSize(node->trans);
assert(sz <= 256); assert(sz <= 256);
...@@ -275,11 +275,11 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil ...@@ -275,11 +275,11 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
if (anyOuts) { if (anyOuts) {
if (FST_BUILDER_NODE_IS_FINAL(node)) { if (FST_BUILDER_NODE_IS_FINAL(node)) {
fstCountingWriterPackUintIn(w, node->finalOutput, oSize); idxFilePackUintIn(w, node->finalOutput, oSize);
} }
for (int32_t i = sz - 1; i >= 0; i--) { for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i); FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterPackUintIn(w, t->out, oSize); idxFilePackUintIn(w, t->out, oSize);
} }
} }
for (int32_t i = sz - 1; i >= 0; i--) { for (int32_t i = sz - 1; i >= 0; i--) {
...@@ -288,7 +288,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil ...@@ -288,7 +288,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
} }
for (int32_t i = sz - 1; i >= 0; i--) { for (int32_t i = sz - 1; i >= 0; i--) {
FstTransition* t = taosArrayGet(node->trans, i); FstTransition* t = taosArrayGet(node->trans, i);
fstCountingWriterWrite(w, (char*)&t->inp, 1); idxFileWrite(w, (char*)&t->inp, 1);
// fstPackDeltaIn(w, addr, t->addr, tSize); // fstPackDeltaIn(w, addr, t->addr, tSize);
} }
if (sz > TRANS_INDEX_THRESHOLD) { if (sz > TRANS_INDEX_THRESHOLD) {
...@@ -306,10 +306,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil ...@@ -306,10 +306,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
index[t->inp] = i; index[t->inp] = i;
// fstPackDeltaIn(w, addr, t->addr, tSize); // fstPackDeltaIn(w, addr, t->addr, tSize);
} }
fstCountingWriterWrite(w, (char*)index, 256); idxFileWrite(w, (char*)index, 256);
taosMemoryFree(index); taosMemoryFree(index);
} }
fstCountingWriterWrite(w, (char*)&packSizes, 1); idxFileWrite(w, (char*)&packSizes, 1);
bool null = false; bool null = false;
fstStateStateNtrans(&st, &null); fstStateStateNtrans(&st, &null);
if (null == true) { if (null == true) {
...@@ -318,12 +318,12 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil ...@@ -318,12 +318,12 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
// encoded in the state byte. // encoded in the state byte.
uint8_t v = 1; uint8_t v = 1;
if (sz == 256) { if (sz == 256) {
fstCountingWriterWrite(w, (char*)&v, 1); idxFileWrite(w, (char*)&v, 1);
} else { } else {
fstCountingWriterWrite(w, (char*)&sz, 1); idxFileWrite(w, (char*)&sz, 1);
} }
} }
fstCountingWriterWrite(w, (char*)(&(st.val)), 1); idxFileWrite(w, (char*)(&(st.val)), 1);
return; return;
} }
...@@ -753,7 +753,7 @@ bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr ...@@ -753,7 +753,7 @@ bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr
return true; return true;
} }
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b); return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b);
} }
...@@ -763,7 +763,7 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) { ...@@ -763,7 +763,7 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
return b; return b;
} }
b->wrt = fstCountingWriterCreate(w); b->wrt = idxFileCreate(w);
b->unfinished = fstUnFinishedNodesCreate(); b->unfinished = fstUnFinishedNodesCreate();
b->registry = fstRegistryCreate(10000, 2); b->registry = fstRegistryCreate(10000, 2);
b->last = fstSliceCreate(NULL, 0); b->last = fstSliceCreate(NULL, 0);
...@@ -773,12 +773,12 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) { ...@@ -773,12 +773,12 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
char buf64[8] = {0}; char buf64[8] = {0};
void* pBuf64 = buf64; void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, VERSION); taosEncodeFixedU64(&pBuf64, VERSION);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64; pBuf64 = buf64;
memset(buf64, 0, sizeof(buf64)); memset(buf64, 0, sizeof(buf64));
taosEncodeFixedU64(&pBuf64, ty); taosEncodeFixedU64(&pBuf64, ty);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); idxFileWrite(b->wrt, buf64, sizeof(buf64));
return b; return b;
} }
...@@ -787,7 +787,7 @@ void fstBuilderDestroy(FstBuilder* b) { ...@@ -787,7 +787,7 @@ void fstBuilderDestroy(FstBuilder* b) {
return; return;
} }
fstCountingWriterDestroy(b->wrt); idxFileDestroy(b->wrt);
fstUnFinishedNodesDestroy(b->unfinished); fstUnFinishedNodesDestroy(b->unfinished);
fstRegistryDestroy(b->registry); fstRegistryDestroy(b->registry);
fstSliceDestroy(&b->last); fstSliceDestroy(&b->last);
...@@ -905,21 +905,19 @@ void* fstBuilderInsertInner(FstBuilder* b) { ...@@ -905,21 +905,19 @@ void* fstBuilderInsertInner(FstBuilder* b) {
void* pBuf64 = buf64; void* pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, b->len); taosEncodeFixedU64(&pBuf64, b->len);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); idxFileWrite(b->wrt, buf64, sizeof(buf64));
pBuf64 = buf64; pBuf64 = buf64;
taosEncodeFixedU64(&pBuf64, rootAddr); taosEncodeFixedU64(&pBuf64, rootAddr);
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); idxFileWrite(b->wrt, buf64, sizeof(buf64));
char buf32[4] = {0}; char buf32[4] = {0};
void* pBuf32 = buf32; void* pBuf32 = buf32;
uint32_t sum = fstCountingWriterMaskedCheckSum(b->wrt); uint32_t sum = idxFileMaskedCheckSum(b->wrt);
taosEncodeFixedU32(&pBuf32, sum); taosEncodeFixedU32(&pBuf32, sum);
fstCountingWriterWrite(b->wrt, buf32, sizeof(buf32)); idxFileWrite(b->wrt, buf32, sizeof(buf32));
fstCountingWriterFlush(b->wrt); idxFileFlush(b->wrt);
// fstCountingWriterDestroy(b->wrt);
// b->wrt = NULL;
return b->wrt; return b->wrt;
} }
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); } void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
......
...@@ -61,9 +61,10 @@ void dfaBuilderDestroy(FstDfaBuilder *builder) { ...@@ -61,9 +61,10 @@ void dfaBuilderDestroy(FstDfaBuilder *builder) {
pIter = taosHashIterate(builder->cache, pIter); pIter = taosHashIterate(builder->cache, pIter);
} }
taosHashCleanup(builder->cache); taosHashCleanup(builder->cache);
taosMemoryFree(builder);
} }
FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) { FstDfa *dfaBuilder(FstDfaBuilder *builder) {
uint32_t sz = taosArrayGetSize(builder->dfa->insts); uint32_t sz = taosArrayGetSize(builder->dfa->insts);
FstSparseSet *cur = sparSetCreate(sz); FstSparseSet *cur = sparSetCreate(sz);
FstSparseSet *nxt = sparSetCreate(sz); FstSparseSet *nxt = sparSetCreate(sz);
......
...@@ -13,13 +13,13 @@ ...@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "indexFstCountingWriter.h" #include "indexFstFile.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
assert(len == taosWriteFile(ctx->file.pFile, buf, len)); assert(len == taosWriteFile(ctx->file.pFile, buf, len));
} else { } else {
...@@ -28,7 +28,7 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { ...@@ -28,7 +28,7 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
ctx->offset += len; ctx->offset += len;
return len; return len;
} }
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
int nRead = 0; int nRead = 0;
if (ctx->type == TFile) { if (ctx->type == TFile) {
#ifdef USE_MMAP #ifdef USE_MMAP
...@@ -44,7 +44,7 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { ...@@ -44,7 +44,7 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
return nRead; return nRead;
} }
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) { static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
int nRead = 0; int nRead = 0;
if (ctx->type == TFile) { if (ctx->type == TFile) {
// tfLseek(ctx->file.pFile, offset, 0); // tfLseek(ctx->file.pFile, offset, 0);
...@@ -61,7 +61,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off ...@@ -61,7 +61,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
} }
return nRead; return nRead;
} }
static int writeCtxGetSize(WriterCtx* ctx) { static int idxFileCtxGetSize(IFileCtx* ctx) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
int64_t file_size = 0; int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL); taosStatFile(ctx->file.buf, &file_size, NULL);
...@@ -69,7 +69,7 @@ static int writeCtxGetSize(WriterCtx* ctx) { ...@@ -69,7 +69,7 @@ static int writeCtxGetSize(WriterCtx* ctx) {
} }
return 0; return 0;
} }
static int writeCtxDoFlush(WriterCtx* ctx) { static int idxFileCtxDoFlush(IFileCtx* ctx) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
// taosFsyncFile(ctx->file.pFile); // taosFsyncFile(ctx->file.pFile);
taosFsyncFile(ctx->file.pFile); taosFsyncFile(ctx->file.pFile);
...@@ -80,8 +80,8 @@ static int writeCtxDoFlush(WriterCtx* ctx) { ...@@ -80,8 +80,8 @@ static int writeCtxDoFlush(WriterCtx* ctx) {
return 1; return 1;
} }
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) { IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
WriterCtx* ctx = taosMemoryCalloc(1, sizeof(WriterCtx)); IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
if (ctx == NULL) { if (ctx == NULL) {
return NULL; return NULL;
} }
...@@ -90,39 +90,36 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -90,39 +90,36 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (ctx->type == TFile) { if (ctx->type == TFile) {
// ugly code, refactor later // ugly code, refactor later
ctx->file.readOnly = readOnly; ctx->file.readOnly = readOnly;
memcpy(ctx->file.buf, path, strlen(path));
if (readOnly == false) { if (readOnly == false) {
// ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosFtruncateFile(ctx->file.pFile, 0); taosFtruncateFile(ctx->file.pFile, 0);
int64_t file_size; taosStatFile(path, &ctx->file.size, NULL);
taosStatFile(path, &file_size, NULL); // ctx->file.size = (int)size;
ctx->file.size = (int)file_size;
} else { } else {
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
int64_t file_size = 0; int64_t size = 0;
taosFStatFile(ctx->file.pFile, &file_size, NULL); taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
ctx->file.size = (int)file_size; ctx->file.size = (int)size;
#ifdef USE_MMAP #ifdef USE_MMAP
ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size); ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
#endif #endif
} }
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.pFile == NULL) { if (ctx->file.pFile == NULL) {
indexError("failed to open file, error %d", errno); indexError("failed to open file, error %d", errno);
goto END; goto END;
} }
} else if (ctx->type == TMemory) { } else if (ctx->type == TMemory) {
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity); ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity; ctx->mem.cap = capacity;
} }
ctx->write = writeCtxDoWrite; ctx->write = idxFileCtxDoWrite;
ctx->read = writeCtxDoRead; ctx->read = idxFileCtxDoRead;
ctx->flush = writeCtxDoFlush; ctx->flush = idxFileCtxDoFlush;
ctx->readFrom = writeCtxDoReadFrom; ctx->readFrom = idxFileCtxDoReadFrom;
ctx->size = writeCtxGetSize; ctx->size = idxFileCtxGetSize;
ctx->offset = 0; ctx->offset = 0;
ctx->limit = capacity; ctx->limit = capacity;
...@@ -135,7 +132,7 @@ END: ...@@ -135,7 +132,7 @@ END:
taosMemoryFree(ctx); taosMemoryFree(ctx);
return NULL; return NULL;
} }
void writerCtxDestroy(WriterCtx* ctx, bool remove) { void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
if (ctx->type == TMemory) { if (ctx->type == TMemory) {
taosMemoryFree(ctx->mem.buf); taosMemoryFree(ctx->mem.buf);
} else { } else {
...@@ -149,9 +146,6 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { ...@@ -149,9 +146,6 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->file.readOnly == false) { if (ctx->file.readOnly == false) {
int64_t file_size = 0; int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL); taosStatFile(ctx->file.buf, &file_size, NULL);
// struct stat fstat;
// stat(ctx->file.buf, &fstat);
// indexError("write file size: %d", (int)(fstat.st_size));
} }
if (remove) { if (remove) {
unlink(ctx->file.buf); unlink(ctx->file.buf);
...@@ -160,30 +154,29 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { ...@@ -160,30 +154,29 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
taosMemoryFree(ctx); taosMemoryFree(ctx);
} }
FstCountingWriter* fstCountingWriterCreate(void* wrt) { IdxFstFile* idxFileCreate(void* wrt) {
FstCountingWriter* cw = taosMemoryCalloc(1, sizeof(FstCountingWriter)); IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
if (cw == NULL) { if (cw == NULL) {
return NULL; return NULL;
} }
cw->wrt = wrt; cw->wrt = wrt;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw; return cw;
} }
void fstCountingWriterDestroy(FstCountingWriter* cw) { void idxFileDestroy(IdxFstFile* cw) {
// free wrt object: close fd or free mem // free wrt object: close fd or free mem
fstCountingWriterFlush(cw); idxFileFlush(cw);
// writerCtxDestroy((WriterCtx *)(cw->wrt)); // idxFileCtxDestroy((IFileCtx *)(cw->wrt));
taosMemoryFree(cw); taosMemoryFree(cw);
} }
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) { int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) { if (write == NULL) {
return 0; return 0;
} }
// update checksum // update checksum
// write data to file/socket or mem // write data to file/socket or mem
WriterCtx* ctx = write->wrt; IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len); int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len); assert(nWrite == len);
...@@ -192,42 +185,41 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) ...@@ -192,42 +185,41 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
write->summer = taosCalcChecksum(write->summer, buf, len); write->summer = taosCalcChecksum(write->summer, buf, len);
return len; return len;
} }
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) { int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) { if (write == NULL) {
return 0; return 0;
} }
WriterCtx* ctx = write->wrt; IFileCtx* ctx = write->wrt;
int nRead = ctx->read(ctx, buf, len); int nRead = ctx->read(ctx, buf, len);
// assert(nRead == len); // assert(nRead == len);
return nRead; return nRead;
} }
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
// opt // opt
return write->summer; return write->summer;
} }
int fstCountingWriterFlush(FstCountingWriter* write) { int idxFileFlush(IdxFstFile* write) {
WriterCtx* ctx = write->wrt; IFileCtx* ctx = write->wrt;
ctx->flush(ctx); ctx->flush(ctx);
// write->wtr->flush
return 1; return 1;
} }
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) { void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
assert(1 <= nBytes && nBytes <= 8); assert(1 <= nBytes && nBytes <= 8);
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t)); uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
for (uint8_t i = 0; i < nBytes; i++) { for (uint8_t i = 0; i < nBytes; i++) {
buf[i] = (uint8_t)n; buf[i] = (uint8_t)n;
n = n >> 8; n = n >> 8;
} }
fstCountingWriterWrite(writer, buf, nBytes); idxFileWrite(writer, buf, nBytes);
taosMemoryFree(buf); taosMemoryFree(buf);
return; return;
} }
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) { uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
uint8_t nBytes = packSize(n); uint8_t nBytes = packSize(n);
fstCountingWriterPackUintIn(writer, n, nBytes); idxFilePackUintIn(writer, n, nBytes);
return nBytes; return nBytes;
} }
...@@ -95,7 +95,7 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { ...@@ -95,7 +95,7 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
} }
} }
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
// startAddr) { // startAddr) {
// size_t sz = taosArrayGetSize(b->trans); // size_t sz = taosArrayGetSize(b->trans);
......
...@@ -75,7 +75,6 @@ CompiledAddr unpackDelta(char* data, uint64_t len, uint64_t nodeAddr) { ...@@ -75,7 +75,6 @@ CompiledAddr unpackDelta(char* data, uint64_t len, uint64_t nodeAddr) {
} }
// fst slice func // fst slice func
//
FstSlice fstSliceCreate(uint8_t* data, uint64_t len) { FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString)); FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString));
...@@ -164,16 +163,3 @@ int fstSliceCompare(FstSlice* a, FstSlice* b) { ...@@ -164,16 +163,3 @@ int fstSliceCompare(FstSlice* a, FstSlice* b) {
return 0; return 0;
} }
} }
// FstStack* fstStackCreate(size_t elemSize, StackFreeElem freeFn) {
// FstStack *s = taosMemoryCalloc(1, sizeof(FstStack));
// if (s == NULL) { return NULL; }
// s->
// s->freeFn
//
//}
// void *fstStackPush(FstStack *s, void *elem);
// void *fstStackTop(FstStack *s);
// size_t fstStackLen(FstStack *s);
// void *fstStackGetAt(FstStack *s, size_t i);
// void fstStackDestory(FstStack *);
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "index.h" #include "index.h"
#include "indexComm.h" #include "indexComm.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h" #include "indexFstFile.h"
#include "indexUtil.h" #include "indexUtil.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -103,7 +103,7 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -103,7 +103,7 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64); IFileCtx* wc = idxFileCtxCreate(TFile, file, true, 1024 * 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index:%s", file); indexError("failed to open index:%s", file);
goto End; goto End;
...@@ -175,7 +175,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { ...@@ -175,7 +175,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
tfileReaderRef(reader); tfileReaderRef(reader);
return; return;
} }
TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* tfileReaderCreate(IFileCtx* ctx) {
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader)); TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
if (reader == NULL) { if (reader == NULL) {
return NULL; return NULL;
...@@ -216,7 +216,7 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -216,7 +216,7 @@ void tfileReaderDestroy(TFileReader* reader) {
} else { } else {
indexInfo("%s is not removed", reader->ctx->file.buf); indexInfo("%s is not removed", reader->ctx->file.buf);
} }
writerCtxDestroy(reader->ctx, reader->remove); idxFileCtxDestroy(reader->ctx, reader->remove);
taosMemoryFree(reader); taosMemoryFree(reader);
} }
...@@ -490,7 +490,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c ...@@ -490,7 +490,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0}; char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname); // indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64); IFileCtx* wcx = idxFileCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { if (wcx == NULL) {
return NULL; return NULL;
} }
...@@ -507,18 +507,18 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c ...@@ -507,18 +507,18 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c
char fullname[256] = {0}; char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); IFileCtx* wc = idxFileCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) { if (wc == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL; return NULL;
} }
indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
} }
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter)); TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
if (tw == NULL) { if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid); indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
...@@ -609,14 +609,14 @@ void tfileWriterClose(TFileWriter* tw) { ...@@ -609,14 +609,14 @@ void tfileWriterClose(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) {
return; return;
} }
writerCtxDestroy(tw->ctx, false); idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw); taosMemoryFree(tw);
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) { if (tw == NULL) {
return; return;
} }
writerCtxDestroy(tw->ctx, false); idxFileCtxDestroy(tw->ctx, false);
taosMemoryFree(tw); taosMemoryFree(tw);
} }
...@@ -892,8 +892,8 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -892,8 +892,8 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
return 0; return 0;
} }
static int tfileReaderLoadFst(TFileReader* reader) { static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx; IFileCtx* ctx = reader->ctx;
int size = ctx->size(ctx); int size = ctx->size(ctx);
// current load fst into memory, refactor it later // current load fst into memory, refactor it later
int fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber); int fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber);
...@@ -905,8 +905,9 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -905,8 +905,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t ts = taosGetTimestampUs(); int64_t ts = taosGetTimestampUs();
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts; int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread, indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost); "us",
nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize); assert(nread > 0 && nread <= fstSize);
...@@ -919,7 +920,7 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -919,7 +920,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
} }
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
// TODO(yihao): opt later // TODO(yihao): opt later
WriterCtx* ctx = reader->ctx; IFileCtx* ctx = reader->ctx;
// add block cache // add block cache
char block[4096] = {0}; char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset); int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
...@@ -952,7 +953,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -952,7 +953,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
} }
static int tfileReaderVerify(TFileReader* reader) { static int tfileReaderVerify(TFileReader* reader) {
// just validate header and Footer, file corrupted also shuild be verified later // just validate header and Footer, file corrupted also shuild be verified later
WriterCtx* ctx = reader->ctx; IFileCtx* ctx = reader->ctx;
uint64_t tMagicNumber = 0; uint64_t tMagicNumber = 0;
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include "index.h" #include "index.h"
#include "indexCache.h" #include "indexCache.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
...@@ -20,7 +19,7 @@ class FstWriter { ...@@ -20,7 +19,7 @@ class FstWriter {
public: public:
FstWriter() { FstWriter() {
taosRemoveFile(fileName.c_str()); taosRemoveFile(fileName.c_str());
_wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024); _wc = idxFileCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0); _b = fstBuilderCreate(_wc, 0);
} }
bool Put(const std::string& key, uint64_t val) { bool Put(const std::string& key, uint64_t val) {
...@@ -38,25 +37,25 @@ class FstWriter { ...@@ -38,25 +37,25 @@ class FstWriter {
fstBuilderFinish(_b); fstBuilderFinish(_b);
fstBuilderDestroy(_b); fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false); idxFileCtxDestroy(_wc, false);
} }
private: private:
FstBuilder* _b; FstBuilder* _b;
WriterCtx* _wc; IFileCtx* _wc;
}; };
class FstReadMemory { class FstReadMemory {
public: public:
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") { FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _wc = idxFileCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = fstCountingWriterCreate(_wc); _w = idxFileCreate(_wc);
_size = size; _size = size;
memset((void*)&_s, 0, sizeof(_s)); memset((void*)&_s, 0, sizeof(_s));
} }
bool init() { bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size); char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { if (nRead <= 0) {
return false; return false;
} }
...@@ -141,18 +140,18 @@ class FstReadMemory { ...@@ -141,18 +140,18 @@ class FstReadMemory {
} }
~FstReadMemory() { ~FstReadMemory() {
fstCountingWriterDestroy(_w); idxFileDestroy(_w);
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false); idxFileCtxDestroy(_wc, false);
} }
private: private:
FstCountingWriter* _w; IdxFstFile* _w;
Fst* _fst; Fst* _fst;
FstSlice _s; FstSlice _s;
WriterCtx* _wc; IFileCtx* _wc;
int32_t _size; int32_t _size;
}; };
#define L 100 #define L 100
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
#include "index.h" #include "index.h"
#include "indexCache.h" #include "indexCache.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
...@@ -40,7 +39,7 @@ static void EnvCleanup() {} ...@@ -40,7 +39,7 @@ static void EnvCleanup() {}
class FstWriter { class FstWriter {
public: public:
FstWriter() { FstWriter() {
_wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024); _wc = idxFileCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
_b = fstBuilderCreate(_wc, 0); _b = fstBuilderCreate(_wc, 0);
} }
bool Put(const std::string& key, uint64_t val) { bool Put(const std::string& key, uint64_t val) {
...@@ -58,25 +57,25 @@ class FstWriter { ...@@ -58,25 +57,25 @@ class FstWriter {
fstBuilderFinish(_b); fstBuilderFinish(_b);
fstBuilderDestroy(_b); fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false); idxFileCtxDestroy(_wc, false);
} }
private: private:
FstBuilder* _b; FstBuilder* _b;
WriterCtx* _wc; IFileCtx* _wc;
}; };
class FstReadMemory { class FstReadMemory {
public: public:
FstReadMemory(size_t size) { FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, tindex, true, 64 * 1024); _wc = idxFileCtxCreate(TFile, tindex, true, 64 * 1024);
_w = fstCountingWriterCreate(_wc); _w = idxFileCreate(_wc);
_size = size; _size = size;
memset((void*)&_s, 0, sizeof(_s)); memset((void*)&_s, 0, sizeof(_s));
} }
bool init() { bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size); char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { if (nRead <= 0) {
return false; return false;
} }
...@@ -130,18 +129,18 @@ class FstReadMemory { ...@@ -130,18 +129,18 @@ class FstReadMemory {
} }
~FstReadMemory() { ~FstReadMemory() {
fstCountingWriterDestroy(_w); idxFileDestroy(_w);
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false); idxFileCtxDestroy(_wc, false);
} }
private: private:
FstCountingWriter* _w; IdxFstFile* _w;
Fst* _fst; Fst* _fst;
FstSlice _s; FstSlice _s;
WriterCtx* _wc; IFileCtx* _wc;
size_t _size; size_t _size;
}; };
class FstWriterEnv : public ::testing::Test { class FstWriterEnv : public ::testing::Test {
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "index.h" #include "index.h"
#include "indexCache.h" #include "indexCache.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
...@@ -51,7 +50,7 @@ class DebugInfo { ...@@ -51,7 +50,7 @@ class DebugInfo {
class FstWriter { class FstWriter {
public: public:
FstWriter() { FstWriter() {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024); _wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
_b = fstBuilderCreate(NULL, 0); _b = fstBuilderCreate(NULL, 0);
} }
bool Put(const std::string& key, uint64_t val) { bool Put(const std::string& key, uint64_t val) {
...@@ -64,25 +63,25 @@ class FstWriter { ...@@ -64,25 +63,25 @@ class FstWriter {
fstBuilderFinish(_b); fstBuilderFinish(_b);
fstBuilderDestroy(_b); fstBuilderDestroy(_b);
writerCtxDestroy(_wc, false); idxFileCtxDestroy(_wc, false);
} }
private: private:
FstBuilder* _b; FstBuilder* _b;
WriterCtx* _wc; IFileCtx* _wc;
}; };
class FstReadMemory { class FstReadMemory {
public: public:
FstReadMemory(size_t size) { FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024); _wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
_w = fstCountingWriterCreate(_wc); _w = idxFileCreate(_wc);
_size = size; _size = size;
memset((void*)&_s, 0, sizeof(_s)); memset((void*)&_s, 0, sizeof(_s));
} }
bool init() { bool init() {
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size); char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { if (nRead <= 0) {
return false; return false;
} }
...@@ -124,18 +123,18 @@ class FstReadMemory { ...@@ -124,18 +123,18 @@ class FstReadMemory {
} }
~FstReadMemory() { ~FstReadMemory() {
fstCountingWriterDestroy(_w); idxFileDestroy(_w);
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc, true); idxFileCtxDestroy(_wc, true);
} }
private: private:
FstCountingWriter* _w; IdxFstFile* _w;
Fst* _fst; Fst* _fst;
FstSlice _s; FstSlice _s;
WriterCtx* _wc; IFileCtx* _wc;
size_t _size; size_t _size;
}; };
#define L 100 #define L 100
...@@ -392,13 +391,13 @@ class TFileObj { ...@@ -392,13 +391,13 @@ class TFileObj {
fileName_ = path; fileName_ = path;
WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); IFileCtx* ctx = idxFileCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
writer_ = tfileWriterCreate(ctx, &header); writer_ = tfileWriterCreate(ctx, &header);
return writer_ != NULL ? true : false; return writer_ != NULL ? true : false;
} }
bool InitReader() { bool InitReader() {
WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024); IFileCtx* ctx = idxFileCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
reader_ = tfileReaderCreate(ctx); reader_ = tfileReaderCreate(ctx);
return reader_ != NULL ? true : false; return reader_ != NULL ? true : false;
} }
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include "index.h" #include "index.h"
#include "indexCache.h" #include "indexCache.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
#include "indexCache.h" #include "indexCache.h"
#include "indexComm.h" #include "indexComm.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
......
...@@ -479,6 +479,10 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { ...@@ -479,6 +479,10 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
} }
return true; return true;
} }
static int32_t transGetRefMgt() {
//
return refMgt;
}
static void transInitEnv() { static void transInitEnv() {
refMgt = transOpenExHandleMgt(50000); refMgt = transOpenExHandleMgt(50000);
...@@ -486,8 +490,9 @@ static void transInitEnv() { ...@@ -486,8 +490,9 @@ static void transInitEnv() {
} }
static void transDestroyEnv() { static void transDestroyEnv() {
// close ref // close ref
transCloseExHandleMgt(refMgt); transCloseExHandleMgt();
} }
void transInit() { void transInit() {
// init env // init env
taosThreadOnce(&transModuleInit, transInitEnv); taosThreadOnce(&transModuleInit, transInitEnv);
...@@ -502,25 +507,25 @@ int32_t transOpenExHandleMgt(int size) { ...@@ -502,25 +507,25 @@ int32_t transOpenExHandleMgt(int size) {
} }
void transCloseExHandleMgt() { void transCloseExHandleMgt() {
// close ref // close ref
taosCloseRef(refMgt); taosCloseRef(transGetRefMgt());
} }
int64_t transAddExHandle(void* p) { int64_t transAddExHandle(void* p) {
// acquire extern handle // acquire extern handle
return taosAddRef(refMgt, p); return taosAddRef(transGetRefMgt(), p);
} }
int32_t transRemoveExHandle(int64_t refId) { int32_t transRemoveExHandle(int64_t refId) {
// acquire extern handle // acquire extern handle
return taosRemoveRef(refMgt, refId); return taosRemoveRef(transGetRefMgt(), refId);
} }
SExHandle* transAcquireExHandle(int64_t refId) { SExHandle* transAcquireExHandle(int64_t refId) {
// acquire extern handle // acquire extern handle
return (SExHandle*)taosAcquireRef(refMgt, refId); return (SExHandle*)taosAcquireRef(transGetRefMgt(), refId);
} }
int32_t transReleaseExHandle(int64_t refId) { int32_t transReleaseExHandle(int64_t refId) {
// release extern handle // release extern handle
return taosReleaseRef(refMgt, refId); return taosReleaseRef(transGetRefMgt(), refId);
} }
void transDestoryExHandle(void* handle) { void transDestoryExHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
......
...@@ -598,6 +598,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset ...@@ -598,6 +598,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册