diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 742427bf94d3e76f94802d1c19a032c80912f5b9..a6862c05c89aac6aa81c111c68df9b64a1673516 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -17,6 +17,7 @@ #define _TD_INDEX_INT_H_ #include "index.h" +#include "tlog.h" #ifdef USE_LUCENE #include @@ -60,6 +61,16 @@ typedef struct SIndexTermQuery { SIndexTerm *indexTermCreate(const char *key, int32_t nKey, const char *val, int32_t nVal); void indexTermDestroy(SIndexTerm *p); + + +#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0) +#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0) +#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); }} while(0) +#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); }} while(0) +#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); }} while(0) +#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); }} while(0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 9280461780fc5aeaeace7339ff114849432bf695..ea090389bba1ff1a12b91ebcac2d15827916286a 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -25,7 +25,7 @@ extern "C" { #define DefaultMem 1024*1024 -static char tmpFile[] = "/tmp/index"; +static char tmpFile[] = "./index"; typedef enum WriterType {TMemory, TFile} WriterType; typedef struct WriterCtx { @@ -45,7 +45,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoFlush(WriterCtx *ctx); -WriterCtx* writerCtxCreate(WriterType type); +WriterCtx* writerCtxCreate(WriterType type, bool readOnly); void writerCtxDestroy(WriterCtx *w); typedef uint32_t CheckSummer; @@ -57,14 +57,16 @@ typedef struct FstCountingWriter { CheckSummer summer; } FstCountingWriter; -int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len); + +int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len); int fstCountingWriterFlush(FstCountingWriter *write); uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write); -FstCountingWriter *fstCountingWriterCreate(void *wtr); +FstCountingWriter *fstCountingWriterCreate(void *wtr, bool readOnly); void fstCountingWriterDestroy(FstCountingWriter *w); diff --git a/source/libs/index/inc/index_fst_util.h b/source/libs/index/inc/index_fst_util.h index 4af885816f59fd1fbe2f178254ed8f14fdc4e834..24b2508678e4664704ffe4425f779d84a876c296 100644 --- a/source/libs/index/inc/index_fst_util.h +++ b/source/libs/index/inc/index_fst_util.h @@ -93,6 +93,28 @@ uint8_t *fstSliceData(FstSlice *s, int32_t *sz); #define FST_SLICE_LEN(s) (s->end - s->start + 1) +//// stack +// +//typedef (*StackFreeElemFn)(void *elem); +// +//typedef struct FstStack { +// void *first; +// void *end; +// size_t elemSize; +// size_t nElem; +// StackFreeElemFn fn; +//} FstStack; +// +// +//FstStack* fstStackCreate(size_t elemSize, stackFreeElem); +//void *fstStackPush(FstStack *s, void *elem); +//void *fstStackTop(FstStack *s); +//size_t fstStackLen(FstStack *s); +//void fstStackDestory(FstStack *); +// + + + #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 418d1f2bda0bad238ecdaf3eb47118a5fad7789a..0cc95b738d36a25694eb15280346f17aa74b6c47 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -16,6 +16,7 @@ #include "index_fst.h" #include "tcoding.h" #include "tchecksum.h" +#include "indexInt.h" static void fstPackDeltaIn(FstCountingWriter *wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) { @@ -40,6 +41,7 @@ void unFinishedNodeDestroyElem(void* elem) { FstBuilderNodeUnfinished *b = (FstBuilderNodeUnfinished*)elem; fstBuilderNodeDestroy(b->node); free(b->last); + b->last = NULL; } void fstUnFinishedNodesDestroy(FstUnFinishedNodes *nodes) { if (nodes == NULL) { return; } @@ -62,15 +64,15 @@ FstBuilderNode *fstUnFinishedNodesPopRoot(FstUnFinishedNodes *nodes) { assert(taosArrayGetSize(nodes->stack) == 1); FstBuilderNodeUnfinished *un = taosArrayPop(nodes->stack); - //assert(un->last == NULL); + assert(un->last == NULL); return un->node; } FstBuilderNode *fstUnFinishedNodesPopFreeze(FstUnFinishedNodes *nodes, CompiledAddr addr) { FstBuilderNodeUnfinished *un = taosArrayPop(nodes->stack); fstBuilderNodeUnfinishedLastCompiled(un, addr); - free(un->last); // TODO add func FstLastTransitionFree() - un->last = NULL; + //free(un->last); // TODO add func FstLastTransitionFree() + //un->last = NULL; return un->node; } @@ -147,7 +149,7 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, size_t lsz = (size_t)(s->end - s->start + 1); // data len size_t ssz = taosArrayGetSize(node->stack); // stack size - + *out = in; uint64_t i = 0; for (i = 0; i < lsz && i < ssz; i++) { FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i); @@ -272,16 +274,16 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil if (FST_BUILDER_NODE_IS_FINAL(node)) { fstCountingWriterPackUintIn(w, node->finalOutput, oSize); } - for (size_t i = 0; i < sz; i++) { + for (int32_t i = sz - 1; i >= 0; i--) { FstTransition *t = taosArrayGet(node->trans, i); fstCountingWriterPackUintIn(w, t->out, oSize); } } - for (size_t i = 0; i < sz; i++) { + for (int32_t i = sz - 1; i >= 0; i--) { FstTransition *t = taosArrayGet(node->trans, i); fstPackDeltaIn(w, addr, t->addr, tSize); } - for (size_t i = 0; i < sz; i++) { + for (int32_t i = sz - 1; i >= 0; i--) { FstTransition *t = taosArrayGet(node->trans, i); fstCountingWriterWrite(w, (char *)&t->inp, 1); //fstPackDeltaIn(w, addr, t->addr, tSize); @@ -402,7 +404,7 @@ CompiledAddr fstStateTransAddr(FstState *s, FstNode *node) { assert(s->state == OneTransNext || s->state == OneTrans); FstSlice *slice = &node->data; if (s->state == OneTransNext) { - return (CompiledAddr)(node->end); + return (CompiledAddr)(node->end) - 1; } else { PackSizes sizes = node->sizes; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(sizes); @@ -457,7 +459,7 @@ Output fstStateOutput(FstState *s, FstNode *node) { uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); uint64_t i = node->start - - fstStateInputLen(s); + - fstStateInputLen(s) - 1 - tSizes - oSizes; @@ -618,7 +620,7 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForOneTransNext(&st, slice); //? s.end_addr(data); + n->end = fstStateEndAddrForOneTransNext(&st, &n->data); //? s.end_addr(data); n->isFinal = false; n->sizes = 0; n->nTrans = 1; @@ -630,23 +632,24 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForOneTrans(&st, slice, sz); // s.end_addr(data, sz); + n->end = fstStateEndAddrForOneTrans(&st, &data, sz); // s.end_addr(data, sz); n->isFinal = false; n->nTrans = 1; n->sizes = sz; n->finalOutput = 0; } else { - uint64_t sz = fstStateSizes(&st, slice); // s.sizes(data) - uint32_t nTrans = fstStateNtrans(&st, slice); // s.ntrans(data) - n->data = *slice; + FstSlice data = fstSliceCopy(slice, 0, addr); + uint64_t sz = fstStateSizes(&st, &data); // s.sizes(data) + uint32_t nTrans = fstStateNtrans(&st, &data); // s.ntrans(data) + n->data = data; n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForAnyTrans(&st, version, slice, sz, nTrans); // s.end_addr(version, data, sz, ntrans); + n->end = fstStateEndAddrForAnyTrans(&st, version, &data, sz, nTrans); // s.end_addr(version, data, sz, ntrans); n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->nTrans = nTrans; n->sizes = sz; - n->finalOutput = fstStateFinalOutput(&st, version, slice, sz, nTrans); // s.final_output(version, data, sz, ntrans); + n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); } return n; } @@ -769,12 +772,23 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { if (NULL == b) { return b; } - b->wrt = fstCountingWriterCreate(w); + b->wrt = fstCountingWriterCreate(w, false); b->unfinished = fstUnFinishedNodesCreate(); b->registry = fstRegistryCreate(10000, 2) ; b->last = fstSliceCreate(NULL, 0); b->lastAddr = NONE_ADDRESS; b->len = 0; + + char buf64[8] = {0}; + void *pBuf64 = buf64; + taosEncodeFixedU64(&pBuf64, VERSION); + fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); + + memset(buf64, 0, sizeof(buf64)); + pBuf64 = buf64; + taosEncodeFixedU64(&pBuf64, ty); + fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64)); + return b; } void fstBuilderDestroy(FstBuilder *b) { @@ -794,6 +808,7 @@ bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in) { fstBuilderInsertOutput(b, bs, in); return true; } + indexInfo("key must be ordered"); return false; } @@ -810,7 +825,7 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) { // prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs); // out = 0; //} - Output out; + Output out; uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out); if (prefixLen == FST_SLICE_LEN(s)) { @@ -937,9 +952,10 @@ void fstLastTransitionDestroy(FstLastTransition *trn) { void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *unNode, CompiledAddr addr) { FstLastTransition *trn = unNode->last; if (trn == NULL) { return; } - FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr}; taosArrayPush(unNode->node->trans, &t); + fstLastTransitionDestroy(trn); + unNode->last = NULL; return; } @@ -1028,8 +1044,9 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; - bool null = fstNodeFindInput(root, inp, &res); - if (null) { return false; } + if (false == fstNodeFindInput(root, inp, &res)) { + return false; + } FstTransition trn; fstNodeGetTransitionAt(root, res, &trn); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 6820292e650fd7375a6b358974ea9055779e2efa..9ec346cebc026cba7d03e1cf39890a6c262d1a9a 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "tutil.h" +#include "indexInt.h" #include "index_fst_util.h" #include "index_fst_counting_writer.h" @@ -22,7 +23,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { } if (ctx->type == TFile) { - assert(len != tfWrite(ctx->fd, buf, len)); + assert(len == tfWrite(ctx->fd, buf, len)); } else { memcpy(ctx->mem + ctx->offset, buf, len); } @@ -30,17 +31,19 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { return len; } static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { + int nRead = 0; if (ctx->type == TFile) { - tfRead(ctx->fd, buf, len); + nRead = tfRead(ctx->fd, buf, len); } else { memcpy(buf, ctx->mem + ctx->offset, len); } - ctx->offset += len; + ctx->offset += nRead; - return 1; + return nRead; } static int writeCtxDoFlush(WriterCtx *ctx) { if (ctx->type == TFile) { + //tfFsync(ctx->fd); //tfFlush(ctx->fd); } else { // do nothing @@ -48,15 +51,21 @@ static int writeCtxDoFlush(WriterCtx *ctx) { return 1; } -WriterCtx* writerCtxCreate(WriterType type) { +WriterCtx* writerCtxCreate(WriterType type, bool readOnly) { WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); if (ctx == NULL) { return NULL; } ctx->type = type; if (ctx->type == TFile) { - ctx->fd = tfOpenCreateWriteAppend(tmpFile); + tfInit(); + // ugly code, refactor later + if (readOnly == false) { + ctx->fd = tfOpenCreateWriteAppend(tmpFile); + } else { + ctx->fd = tfOpenReadWrite(tmpFile); + } if (ctx->fd < 0) { - + indexError("open file error %d", errno); } } else if (ctx->type == TMemory) { ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t)); @@ -75,33 +84,43 @@ void writerCtxDestroy(WriterCtx *ctx) { free(ctx->mem); } else { tfClose(ctx->fd); + tfCleanup(); } free(ctx); } -FstCountingWriter *fstCountingWriterCreate(void *wrt) { +FstCountingWriter *fstCountingWriterCreate(void *wrt, bool readOnly) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - cw->wrt = (void *)(writerCtxCreate(TFile)); + cw->wrt = (void *)(writerCtxCreate(TFile, readOnly)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { // free wrt object: close fd or free mem + fstCountingWriterFlush(cw); writerCtxDestroy((WriterCtx *)(cw->wrt)); free(cw); } -int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) { if (write == NULL) { return 0; } // update checksum // write data to file/socket or mem WriterCtx *ctx = write->wrt; - int nWrite = ctx->write(ctx, buf, bufLen); - write->count += nWrite; - return bufLen; + int nWrite = ctx->write(ctx, buf, len); + assert(nWrite == len); + write->count += len; + return len; +} +int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) { + if (write == NULL) { return 0; } + WriterCtx *ctx = write->wrt; + int nRead = ctx->read(ctx, buf, len); + //assert(nRead == len); + return nRead; } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 532b0b8ac3704ae88449959b6739e8bc6b436011..c933c6d23b2eed040bcf9fd276d783c13b7ad025 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -67,7 +67,7 @@ uint8_t packSize(uint64_t n) { } uint64_t unpackUint64(uint8_t *ch, uint8_t sz) { - uint64_t n; + uint64_t n = 0; for (uint8_t i = 0; i < sz; i++) { n = n | (ch[i] << (8 * i)); } @@ -174,5 +174,18 @@ int fstSliceCompare(FstSlice *a, FstSlice *b) { else { return 0; } } +//FstStack* fstStackCreate(size_t elemSize, StackFreeElem freeFn) { +// FstStack *s = calloc(1, sizeof(FstStack)); +// if (s == NULL) { return NULL; } +// s-> +// s->freeFn +// +//} +//void *fstStackPush(FstStack *s, void *elem); +//void *fstStackTop(FstStack *s); +//size_t fstStackLen(FstStack *s); +//void *fstStackGetAt(FstStack *s, size_t i); +//void fstStackDestory(FstStack *); + diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 14ff2caf120d8700472eb9d9876bad753471a225..858861529d5bdb60a3dcdcef43440b7676404a06 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -63,19 +63,67 @@ //} int main(int argc, char** argv) { - std::string str("abc"); - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - Output val = 1; + // test write + FstBuilder *b = fstBuilderCreate(NULL, 0); + { + std::string str("aaa"); + FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); + Output val = 1; + fstBuilderInsert(b, key, val); + } //std::string str1("bcd"); //FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size()); //Output val2 = 10; - FstBuilder *b = fstBuilderCreate(NULL, 1); - fstBuilderInsert(b, key, val); - //fstBuilderInsert(b, key1, val2); + // + + { + + for (size_t i = 1; i < 26; i++) { + std::string str("aaa"); + str[2] = 'a' + i ; + FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); + Output val = 2; + fstBuilderInsert(b, key, val); + } + + } fstBuilderFinish(b); fstBuilderDestroy(b); - fstSliceDestroy(&key); + + + char buf[64 * 1024] = {0}; + + FstSlice s; + + FstCountingWriter *w = fstCountingWriterCreate(NULL, true); + int nRead = fstCountingWriterRead(w, (uint8_t *)buf, sizeof(buf)); + assert(nRead <= sizeof(buf)); + s = fstSliceCreate((uint8_t *)buf, nRead); + fstCountingWriterDestroy(w); + + + // test reader + + + Fst *fst = fstCreate(&s); + { + std::string str("aaa"); + uint64_t out; + + + FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); + bool ok = fstGet(fst, &key, &out); + if (ok == true) { + //indexInfo("Get key-value success, %s, %d", str.c_str(), out); + } else { + //indexError("Get key-value failed, %s", str.c_str()); + } + } + fstSliceDestroy(&s); + + + return 1; }