提交 5d4d7b47 编写于 作者: dengyihao's avatar dengyihao

update index TFile manage and format code

上级 6fe118c5
......@@ -23,64 +23,63 @@
extern "C" {
#endif
typedef struct SIndex SIndex;
typedef struct SIndexTerm SIndexTerm;
typedef struct SIndexOpts SIndexOpts;
typedef struct SIndex SIndex;
typedef struct SIndexTerm SIndexTerm;
typedef struct SIndexOpts SIndexOpts;
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
typedef struct SArray SIndexMultiTerm;
typedef enum {
ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value
UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable
typedef enum {
ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value
UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable
} SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3 } EIndexQueryType;
/*
* @param: oper
* @param: oper
*
*/
*/
SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType oper);
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery);
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType type);
/*
* @param:
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery);
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType type);
/*
* @param:
* @param:
*/
int indexOpen(SIndexOpts *opt, const char *path, SIndex **index);
void indexClose(SIndex *index);
int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid);
int indexDelete(SIndex *index, SIndexMultiTermQuery *query);
int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result);
int indexRebuild(SIndex *index, SIndexOpts *opt);
int indexOpen(SIndexOpts *opt, const char *path, SIndex **index);
void indexClose(SIndex *index);
int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid);
int indexDelete(SIndex *index, SIndexMultiTermQuery *query);
int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result);
int indexRebuild(SIndex *index, SIndexOpts *opt);
/*
* @param
* @param
*/
SIndexMultiTerm *indexMultiTermCreate();
int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term);
void indexMultiTermDestroy(SIndexMultiTerm *terms);
SIndexMultiTerm *indexMultiTermCreate();
int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term);
void indexMultiTermDestroy(SIndexMultiTerm *terms);
/*
* @param:
* @param:
* @param:
*/
SIndexOpts *indexOptsCreate();
void indexOptsDestroy(SIndexOpts *opts);
void indexOptsDestroy(SIndexOpts *opts);
/*
* @param:
* @param:
*/
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType,
const char *colName, int32_t nColName, const char *colVal, int32_t nColVal);
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char *colName,
int32_t nColName, const char *colVal, int32_t nColVal);
void indexTermDestroy(SIndexTerm *p);
#ifdef __cplusplus
}
#endif
......
......@@ -18,87 +18,112 @@
#include "index.h"
#include "index_fst.h"
#include "tlog.h"
#include "thash.h"
#include "taos.h"
#include "thash.h"
#include "tlog.h"
#ifdef USE_LUCENE
#include <lucene++/Lucene_c.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {kTypeValue, kTypeDeletion} STermValueType ;
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef struct SIndexStat {
int32_t totalAdded; //
int32_t totalDeled; //
int32_t totalUpdated; //
int32_t totalTerms; //
int32_t distinctCol; // distinct column
} SIndexStat;
int32_t totalAdded; //
int32_t totalDeled; //
int32_t totalUpdated; //
int32_t totalTerms; //
int32_t distinctCol; // distinct column
} SIndexStat;
struct SIndex {
#ifdef USE_LUCENE
index_t *index;
#endif
void *cache;
void *tindex;
SHashObj *colObj;// < field name, field id>
int64_t suid; // current super table id, -1 is normal table
int colId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache
SIndexStat stat;
pthread_mutex_t mtx;
};
#ifdef USE_LUCENE
index_t *index;
#endif
void * cache;
void * tindex;
SHashObj *colObj; // < field name, field id>
int64_t suid; // current super table id, -1 is normal table
int colId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache
SIndexStat stat;
pthread_mutex_t mtx;
};
struct SIndexOpts {
#ifdef USE_LUCENE
void *opts;
#endif
#ifdef USE_LUCENE
void *opts;
#endif
#ifdef USE_INVERTED_INDEX
int32_t cacheSize; // MB
int32_t cacheSize; // MB
// add cache module later
#endif
};
struct SIndexMultiTermQuery {
EIndexOperatorType opera;
SArray *query;
EIndexOperatorType opera;
SArray * query;
};
// field and key;
typedef struct SIndexTerm {
int64_t suid;
SIndexOperOnColumn operType; // oper type, add/del/update
uint8_t colType; // term data type, str/interger/json
char *colName;
int32_t nColName;
char *colVal;
int32_t nColVal;
int64_t suid;
SIndexOperOnColumn operType; // oper type, add/del/update
uint8_t colType; // term data type, str/interger/json
char * colName;
int32_t nColName;
char * colVal;
int32_t nColVal;
} SIndexTerm;
typedef struct SIndexTermQuery {
SIndexTerm* term;
SIndexTerm * term;
EIndexQueryType qType;
} SIndexTermQuery;
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0)
#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); }} while(0)
#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); }} while(0)
#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); }} while(0)
#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); }} while(0)
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#ifdef __cplusplus
}
......
......@@ -22,8 +22,10 @@
// ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
* content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->|
* |<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|
* <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->|
*/
#ifdef __cplusplus
......@@ -31,25 +33,23 @@ extern "C" {
#endif
typedef struct IndexCache {
T_REF_DECLARE()
T_REF_DECLARE()
SSkipList *skiplist;
} IndexCache;
//
//
IndexCache *indexCacheCreate();
void indexCacheDestroy(void *cache);
int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid);
int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid);
//int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s);
// int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(
void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s);
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,29 +20,29 @@
extern "C" {
#endif
#include "tarray.h"
#include "index_fst_util.h"
#include "index_fst_registry.h"
#include "index_fst_counting_writer.h"
#include "index_fst_automation.h"
#include "index_fst_counting_writer.h"
#include "index_fst_registry.h"
#include "index_fst_util.h"
#include "tarray.h"
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
typedef struct Fst Fst;
typedef struct FstNode FstNode;
typedef struct Fst Fst;
typedef struct FstNode FstNode;
typedef struct StreamWithState StreamWithState;
typedef enum { Included, Excluded, Unbounded} FstBound;
typedef enum { Included, Excluded, Unbounded } FstBound;
typedef struct FstBoundWithData {
FstSlice data;
FstSlice data;
FstBound type;
} FstBoundWithData;
typedef struct FstStreamBuilder {
Fst *fst;
AutomationCtx *aut;
FstBoundWithData *min;
Fst * fst;
AutomationCtx * aut;
FstBoundWithData *min;
FstBoundWithData *max;
} FstStreamBuilder, FstStreamWithStateBuilder;
......@@ -51,17 +51,14 @@ typedef struct FstRange {
uint64_t end;
} FstRange;
typedef enum { GE, GT, LE, LT } RangeType;
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State;
typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType;
typedef enum {GE, GT, LE, LT} RangeType;
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data);
bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice);
bool fstBoundWithDataIsEmpty(FstBoundWithData *bound);
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound);
FstBoundWithData *fstBoundStateCreate(FstBound type, FstSlice *data);
bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice);
bool fstBoundWithDataIsEmpty(FstBoundWithData *bound);
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound);
typedef struct FstOutput {
bool null;
......@@ -69,110 +66,105 @@ typedef struct FstOutput {
} FstOutput;
/*
*
*
* UnFinished node and helper function
* TODO: simple function name
* TODO: simple function name
*/
typedef struct FstUnFinishedNodes {
SArray *stack; // <FstBuilderNodeUnfinished> } FstUnFinishedNodes;
SArray *stack; // <FstBuilderNodeUnfinished> } FstUnFinishedNodes;
} FstUnFinishedNodes;
#define FST_UNFINISHED_NODES_LEN(nodes) taosArrayGetSize(nodes->stack)
FstUnFinishedNodes *fstUnFinishedNodesCreate();
void fstUnFinishedNodesDestroy(FstUnFinishedNodes *node);
void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal);
FstBuilderNode *fstUnFinishedNodesPopRoot(FstUnFinishedNodes *nodes);
FstBuilderNode *fstUnFinishedNodesPopFreeze(FstUnFinishedNodes *nodes, CompiledAddr addr);
FstBuilderNode *fstUnFinishedNodesPopEmpty(FstUnFinishedNodes *nodes);
void fstUnFinishedNodesSetRootOutput(FstUnFinishedNodes *node, Output out);
void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *node, CompiledAddr addr);
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *node, FstSlice bs, Output out);
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs);
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out);
#define FST_UNFINISHED_NODES_LEN(nodes) taosArrayGetSize(nodes->stack)
FstUnFinishedNodes *fstUnFinishedNodesCreate();
void fstUnFinishedNodesDestroy(FstUnFinishedNodes *node);
void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal);
void fstUnFinishedNodesSetRootOutput(FstUnFinishedNodes *node, Output out);
void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *node, CompiledAddr addr);
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *node, FstSlice bs, Output out);
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs);
FstBuilderNode * fstUnFinishedNodesPopRoot(FstUnFinishedNodes *nodes);
FstBuilderNode * fstUnFinishedNodesPopFreeze(FstUnFinishedNodes *nodes, CompiledAddr addr);
FstBuilderNode * fstUnFinishedNodesPopEmpty(FstUnFinishedNodes *nodes);
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out);
typedef struct FstBuilder {
FstCountingWriter *wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes *unfinished; // The stack of unfinished nodes
FstRegistry* registry; // A map of finished nodes.
FstSlice last; // The last word added
CompiledAddr lastAddr; // The address of the last compiled node
uint64_t len; // num of keys added
FstCountingWriter * wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes *unfinished; // The stack of unfinished nodes
FstRegistry * registry; // A map of finished nodes.
FstSlice last; // The last word added
CompiledAddr lastAddr; // The address of the last compiled node
uint64_t len; // num of keys added
} FstBuilder;
FstBuilder *fstBuilderCreate(void *w, FstType ty);
void fstBuilderDestroy(FstBuilder *b);
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in);
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
void fstBuilderDestroy(FstBuilder *b);
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in);
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
void * fstBuilerIntoInner(FstBuilder *b);
void fstBuilderFinish(FstBuilder *b);
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn);
void* fstBuilerIntoInner(FstBuilder *b);
void fstBuilderFinish(FstBuilder *b);
typedef struct FstTransitions {
FstNode *node;
FstRange range;
FstNode *node;
FstRange range;
} FstTransitions;
//FstState and relation function
// FstState and relation function
typedef struct FstState {
State state;
State state;
uint8_t val;
} FstState;
FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr);
FstState fstStateCreateFrom(FstSlice *data, CompiledAddr addr);
FstState fstStateCreate(State state);
//compile
// compile
void fstStateCompileForOneTransNext(FstCountingWriter *w, CompiledAddr addr, uint8_t inp);
void fstStateCompileForOneTrans(FstCountingWriter *w, CompiledAddr addr, FstTransition *trn);
void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuilderNode *node);
// set_comm_input
void fstStateSetCommInput(FstState* state, uint8_t inp);
void fstStateSetCommInput(FstState *state, uint8_t inp);
// comm_input
uint8_t fstStateCommInput(FstState* state, bool *null);
uint8_t fstStateCommInput(FstState *state, bool *null);
// input_len
uint64_t fstStateInputLen(FstState* state);
uint64_t fstStateInputLen(FstState *state);
// end_addr
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice *data);
// end_addr
uint64_t fstStateEndAddrForOneTransNext(FstState *state, FstSlice *data);
uint64_t fstStateEndAddrForOneTrans(FstState *state, FstSlice *data, PackSizes sizes);
uint64_t fstStateEndAddrForAnyTrans(FstState *state, uint64_t version, FstSlice *date, PackSizes sizes, uint64_t nTrans);
// input
uint8_t fstStateInput(FstState *state, FstNode *node);
uint8_t fstStateInputForAnyTrans(FstState *state, FstNode *node, uint64_t i);
uint64_t fstStateEndAddrForAnyTrans(
FstState *state, uint64_t version, FstSlice *date, PackSizes sizes, uint64_t nTrans);
// input
uint8_t fstStateInput(FstState *state, FstNode *node);
uint8_t fstStateInputForAnyTrans(FstState *state, FstNode *node, uint64_t i);
// trans_addr
CompiledAddr fstStateTransAddr(FstState *state, FstNode *node);
CompiledAddr fstStateTransAddrForAnyTrans(FstState *state, FstNode *node, uint64_t i);
// sizes
// sizes
PackSizes fstStateSizes(FstState *state, FstSlice *data);
// Output
// Output
Output fstStateOutput(FstState *state, FstNode *node);
Output fstStateOutputForAnyTrans(FstState *state, FstNode *node, uint64_t i);
// anyTrans specify function
void fstStateSetFinalState(FstState *state, bool yes);
bool fstStateIsFinalState(FstState *state);
bool fstStateIsFinalState(FstState *state);
void fstStateSetStateNtrans(FstState *state, uint8_t n);
// state_ntrans
uint8_t fstStateStateNtrans(FstState *state, bool *null);
uint8_t fstStateStateNtrans(FstState *state, bool *null);
uint64_t fstStateTotalTransSize(FstState *state, uint64_t version, PackSizes size, uint64_t nTrans);
uint64_t fstStateTransIndexSize(FstState *state, uint64_t version, uint64_t nTrans);
uint64_t fstStateNtransLen(FstState *state);
......@@ -180,72 +172,72 @@ uint64_t fstStateNtrans(FstState *state, FstSlice *slice);
Output fstStateFinalOutput(FstState *state, uint64_t version, FstSlice *date, PackSizes sizes, uint64_t nTrans);
uint64_t fstStateFindInput(FstState *state, FstNode *node, uint8_t b, bool *null);
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
typedef struct FstLastTransition {
uint8_t inp;
Output out;
} FstLastTransition;
/*
/*
* FstBuilderNodeUnfinished and helper function
* TODO: simple function name
* TODO: simple function name
*/
typedef struct FstBuilderNodeUnfinished {
FstBuilderNode *node;
FstLastTransition* last;
FstBuilderNode * node;
FstLastTransition *last;
} FstBuilderNodeUnfinished;
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *node, CompiledAddr addr);
void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *node, Output out);
/*
* FstNode and helper function
* FstNode and helper function
*/
typedef struct FstNode {
FstSlice data;
uint64_t version;
uint64_t version;
FstState state;
CompiledAddr start;
CompiledAddr end;
CompiledAddr start;
CompiledAddr end;
bool isFinal;
uint64_t nTrans;
PackSizes sizes;
Output finalOutput;
Output finalOutput;
} FstNode;
// If this node is final and has a terminal output value, then it is, returned. Otherwise, a zero output is returned
// If this node is final and has a terminal output value, then it is, returned.
// Otherwise, a zero output is returned
#define FST_NODE_FINAL_OUTPUT(node) node->finalOutput
// Returns true if and only if this node corresponds to a final or "match", state in the finite state transducer.
// Returns true if and only if this node corresponds to a final or "match",
// state in the finite state transducer.
#define FST_NODE_IS_FINAL(node) node->isFinal
// Returns the number of transitions in this node, The maximum number of transitions is 256.
// Returns the number of transitions in this node, The maximum number of
// transitions is 256.
#define FST_NODE_LEN(node) node->nTrans
// Returns true if and only if this node has zero transitions.
#define FST_NODE_IS_EMPTYE(node) (node->nTrans == 0)
// Return the address of this node.
#define FST_NODE_ADDR(node) node->start
#define FST_NODE_ADDR(node) node->start
FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *data);
void fstNodeDestroy(FstNode *fstNode);
void fstNodeDestroy(FstNode *fstNode);
FstTransitions fstNodeTransitionIter(FstNode *node);
FstTransitions* fstNodeTransitions(FstNode *node);
bool fstNodeGetTransitionAt(FstNode *node, uint64_t i, FstTransition *res);
bool fstNodeGetTransitionAddrAt(FstNode *node, uint64_t i, CompiledAddr *res);
bool fstNodeFindInput(FstNode *node, uint8_t b, uint64_t *res);
bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode *builderNode);
FstSlice fstNodeAsSlice(FstNode *node);
FstTransitions fstNodeTransitionIter(FstNode *node);
FstTransitions *fstNodeTransitions(FstNode *node);
bool fstNodeGetTransitionAt(FstNode *node, uint64_t i, FstTransition *res);
bool fstNodeGetTransitionAddrAt(FstNode *node, uint64_t i, CompiledAddr *res);
bool fstNodeFindInput(FstNode *node, uint8_t b, uint64_t *res);
// ops
bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode *builderNode);
FstSlice fstNodeAsSlice(FstNode *node);
// ops
typedef struct FstIndexedValue {
uint64_t index;
......@@ -253,87 +245,87 @@ typedef struct FstIndexedValue {
} FstIndexedValue;
FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out);
void fstLastTransitionDestroy(FstLastTransition *trn);
void fstLastTransitionDestroy(FstLastTransition *trn);
typedef struct FstMeta {
uint64_t version;
CompiledAddr rootAddr;
CompiledAddr rootAddr;
FstType ty;
uint64_t len;
uint32_t checkSum;
} FstMeta;
typedef struct Fst {
FstMeta *meta;
FstSlice *data; //
FstNode *root; //
FstMeta * meta;
FstSlice *data; //
FstNode * root; //
} Fst;
// refactor simple function
// refactor simple function
Fst* fstCreate(FstSlice *data);
void fstDestroy(Fst *fst);
Fst *fstCreate(FstSlice *data);
void fstDestroy(Fst *fst);
bool fstGet(Fst *fst, FstSlice *b, Output *out);
FstNode* fstGetNode(Fst *fst, CompiledAddr);
FstNode* fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
bool fstGet(Fst *fst, FstSlice *b, Output *out);
FstNode * fstGetNode(Fst *fst, CompiledAddr);
FstNode * fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx);
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx);
// into stream to expand later
StreamWithState* streamBuilderIntoStream(FstStreamBuilder *sb);
bool fstVerify(Fst *fst);
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx);
// into stream to expand later
StreamWithState *streamBuilderIntoStream(FstStreamBuilder *sb);
bool fstVerify(Fst *fst);
//refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
// refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
typedef struct StreamState {
FstNode *node;
FstNode * node;
uint64_t trans;
FstOutput out;
void *autState;
} StreamState;
FstOutput out;
void * autState;
} StreamState;
void streamStateDestroy(void *s);
typedef struct StreamWithState {
Fst *fst;
AutomationCtx *aut;
SArray *inp;
FstOutput emptyOutput;
SArray *stack; // <StreamState>
Fst * fst;
AutomationCtx * aut;
SArray * inp;
FstOutput emptyOutput;
SArray * stack; // <StreamState>
FstBoundWithData *endAt;
} StreamWithState;
typedef struct StreamWithStateResult {
FstSlice data;
FstSlice data;
FstOutput out;
void *state;
void * state;
} StreamWithStateResult;
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
void swsResultDestroy(StreamWithStateResult *result);
void swsResultDestroy(StreamWithStateResult *result);
typedef void *(*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(
Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max);
typedef void* (*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) ;
void streamWithStateDestroy(StreamWithState *sws);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut);
StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut);
// set up bound range
// refator, simple code by marco
// refator, simple code by marco
FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type);
#ifdef __cplusplus
}
#endif
......
......@@ -21,62 +21,56 @@ extern "C" {
#include "index_fst_util.h"
typedef struct AutomationCtx AutomationCtx;
typedef enum AutomationType {
AUTOMATION_PREFIX,
AUTMMATION_MATCH
} AutomationType;
typedef enum AutomationType { AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType;
typedef struct StartWith {
AutomationCtx *autoSelf;
AutomationCtx *autoSelf;
} StartWith;
typedef struct Complement {
AutomationCtx *autoSelf;
} Complement;
// automation
// automation
typedef struct AutomationCtx {
AutomationType type;
void *stdata;
char *data;
AutomationType type;
void * stdata;
char * data;
} AutomationCtx;
typedef enum ValueType { FST_INT, FST_CHAR, FST_ARRAY} ValueType;
typedef enum StartWithStateKind { Done, Running } StartWithStateKind;
typedef enum ValueType { FST_INT, FST_CHAR, FST_ARRAY } ValueType;
typedef enum StartWithStateKind { Done, Running } StartWithStateKind;
typedef struct StartWithStateValue {
StartWithStateKind kind;
ValueType type;
ValueType type;
union {
int val;
char *ptr;
int val;
char * ptr;
SArray *arr;
// add more type
} ;
};
} StartWithStateValue;
StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void *val);
StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv);
void startWithStateValueDestroy(void *sv);
StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv);
void startWithStateValueDestroy(void *sv);
typedef struct AutomationFunc {
void* (*start)(AutomationCtx *ctx) ;
void *(*start)(AutomationCtx *ctx);
bool (*isMatch)(AutomationCtx *ctx, void *);
bool (*canMatch)(AutomationCtx *ctx, void *data);
bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state);
void* (*accept)(AutomationCtx *ctx, void *state, uint8_t byte);
void* (*acceptEof)(AutomationCtx *ct, void *state);
} AutomationFunc;
bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state);
void *(*accept)(AutomationCtx *ctx, void *state, uint8_t byte);
void *(*acceptEof)(AutomationCtx *ct, void *state);
} AutomationFunc;
AutomationCtx *automCtxCreate(void *data, AutomationType atype);
void automCtxDestroy(AutomationCtx *ctx);
void automCtxDestroy(AutomationCtx *ctx);
extern AutomationFunc automFuncs[];
extern AutomationFunc automFuncs[];
#ifdef __cplusplus
}
#endif
......
#ifndef __INDEX_FST_COMM_H__
#define __INDEX_FST_COMM_H__
#include "tutil.h"
extern const uint8_t COMMON_INPUTS[];
extern char const COMMON_INPUTS_INV[];
extern const char COMMON_INPUTS_INV[];
#ifdef __cplusplus
extern "C" {
......
......@@ -22,25 +22,24 @@ extern "C" {
#include "tfile.h"
#define DefaultMem 1024*1024
#define DefaultMem 1024 * 1024
static char tmpFile[] = "./index";
typedef enum WriterType {TMemory, TFile} WriterType;
typedef enum WriterType { TMemory, TFile } WriterType;
typedef struct WriterCtx {
int (*write)(struct WriterCtx *ctx, uint8_t *buf, int len);
int (*read)(struct WriterCtx *ctx, uint8_t *buf, int len);
int (*flush)(struct WriterCtx *ctx);
WriterType type;
WriterType type;
union {
struct {
int fd;
int fd;
bool readOnly;
} file;
} file;
struct {
int32_t capa;
char *buf;
char * buf;
} mem;
};
int32_t offset;
......@@ -51,35 +50,31 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len);
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len);
static int writeCtxDoFlush(WriterCtx *ctx);
WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx *w);
WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity);
void writerCtxDestroy(WriterCtx *w);
typedef uint32_t CheckSummer;
typedef struct FstCountingWriter {
void* wrt; // wrap any writer that counts and checksum bytes written
uint64_t count;
CheckSummer summer;
void * wrt; // wrap any writer that counts and checksum bytes written
uint64_t count;
CheckSummer summer;
} FstCountingWriter;
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len);
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len);
int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len);
int fstCountingWriterFlush(FstCountingWriter *write);
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write);
FstCountingWriter *fstCountingWriterCreate(void *wtr);
void fstCountingWriterDestroy(FstCountingWriter *w);
void fstCountingWriterDestroy(FstCountingWriter *w);
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n, uint8_t nBytes);
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n, uint8_t nBytes);
uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n);
#define FST_WRITER_COUNT(writer) (writer->count)
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
......@@ -89,5 +84,3 @@ uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n);
#endif
#endif
......@@ -20,24 +20,24 @@
extern "C" {
#endif
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
#define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0)
typedef struct FstTransition {
uint8_t inp; //The byte input associated with this transition.
Output out; //The output associated with this transition
CompiledAddr addr; //The address of the node that this transition points to
uint8_t inp; // The byte input associated with this transition.
Output out; // The output associated with this transition
CompiledAddr addr; // The address of the node that this transition points to
} FstTransition;
typedef struct FstBuilderNode {
bool isFinal;
Output finalOutput;
bool isFinal;
Output finalOutput;
SArray *trans; // <FstTransition>
} FstBuilderNode;
} FstBuilderNode;
FstBuilderNode *fstBuilderNodeDefault();
......@@ -45,8 +45,9 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src);
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src);
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2);
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt,
// CompiledAddr lastAddr, CompiledAddr startAddr);
bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2);
void fstBuilderNodeDestroy(FstBuilderNode *node);
......
......@@ -19,49 +19,48 @@
extern "C" {
#endif
#include "index_fst_node.h"
#include "index_fst_util.h"
#include "tarray.h"
#include "index_fst_node.h"
typedef struct FstRegistryCell {
CompiledAddr addr;
FstBuilderNode *node;
CompiledAddr addr;
FstBuilderNode *node;
} FstRegistryCell;
#define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS)
#define FST_REGISTRY_CELL_INSERT(cell, tAddr) do {cell->addr = tAddr;} while(0)
#define FST_REGISTRY_CELL_INSERT(cell, tAddr) \
do { \
cell->addr = tAddr; \
} while (0)
//typedef struct FstRegistryCache {
// SArray *cells;
// typedef struct FstRegistryCache {
// SArray *cells;
// uint32_t start;
// uint32_t end;
//} FstRegistryCache;
typedef enum {FOUND, NOTFOUND, REJECTED} FstRegistryEntryState;
typedef enum { FOUND, NOTFOUND, REJECTED } FstRegistryEntryState;
typedef struct FstRegistryEntry {
FstRegistryEntryState state;
CompiledAddr addr;
FstRegistryCell *cell;
} FstRegistryEntry;
CompiledAddr addr;
FstRegistryCell * cell;
} FstRegistryEntry;
// Registry relation function
// Registry relation function
typedef struct FstRegistry {
SArray *table; //<FstRegistryCell>
uint64_t tableSize; // num of rows
uint64_t mruSize; // num of columns
} FstRegistry;
//
FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize);
void fstRegistryDestroy(FstRegistry *registry);
SArray * table; //<FstRegistryCell>
uint64_t tableSize; // num of rows
uint64_t mruSize; // num of columns
} FstRegistry;
//
FstRegistry *fstRegistryCreate(uint64_t tableSize, uint64_t mruSize);
void fstRegistryDestroy(FstRegistry *registry);
FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode);
void fstRegistryEntryDestroy(FstRegistryEntry *entry);
FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode);
void fstRegistryEntryDestroy(FstRegistryEntry *entry);
#ifdef __cplusplus
}
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_FST_UTIL_H__
#define __INDEX_FST_UTIL_H__
......@@ -21,16 +20,15 @@
extern "C" {
#endif
#include "tarray.h"
#include "index_fst_common.h"
#include "tarray.h"
typedef uint64_t FstType;
typedef uint64_t CompiledAddr;
typedef uint64_t Output;
typedef uint8_t PackSizes;
typedef uint64_t CompiledAddr;
typedef uint64_t Output;
typedef uint8_t PackSizes;
//A sentinel value used to indicate an empty final state
// A sentinel value used to indicate an empty final state
extern const CompiledAddr EMPTY_ADDRESS;
/// A sentinel value used to indicate an invalid state.
extern const CompiledAddr NONE_ADDRESS;
......@@ -38,9 +36,9 @@ extern const CompiledAddr NONE_ADDRESS;
// This version number is written to every finite state transducer created by
// this version When a finite state transducer is read, its version number is
// checked against this value.
extern const uint64_t VERSION;
// The threshold (in number of transitions) at which an index is created for
// a node's transitions. This speeds up lookup time at the expense of FST size
extern const uint64_t VERSION;
// The threshold (in number of transitions) at which an index is created for
// a node's transitions. This speeds up lookup time at the expense of FST size
extern const uint64_t TRANS_INDEX_THRESHOLD;
// high 4 bits is transition address packed size.
......@@ -48,73 +46,75 @@ extern const uint64_t TRANS_INDEX_THRESHOLD;
//
// `0` is a legal value which means there are no transitions/outputs
#define FST_SET_TRANSITION_PACK_SIZE(v, sz) do {v = (v & 0b00001111) | (sz << 4); } while(0)
#define FST_GET_TRANSITION_PACK_SIZE(v) (((v) & 0b11110000) >> 4)
#define FST_SET_OUTPUT_PACK_SIZE(v, sz) do { v = (v & 0b11110000) | sz; } while(0)
#define FST_GET_OUTPUT_PACK_SIZE(v) ((v) & 0b00001111)
#define COMMON_INPUT(idx) COMMON_INPUTS_INV[(idx) - 1]
#define COMMON_INDEX(v, max, val) do { \
val = ((uint16_t)COMMON_INPUTS[v] + 1)%256; \
val = val > max ? 0: val; \
} while(0)
//uint8_t commonInput(uint8_t idx);
//uint8_t commonIdx(uint8_t v, uint8_t max);
uint8_t packSize(uint64_t n);
#define FST_SET_TRANSITION_PACK_SIZE(v, sz) \
do { \
v = (v & 0b00001111) | (sz << 4); \
} while (0)
#define FST_GET_TRANSITION_PACK_SIZE(v) (((v)&0b11110000) >> 4)
#define FST_SET_OUTPUT_PACK_SIZE(v, sz) \
do { \
v = (v & 0b11110000) | sz; \
} while (0)
#define FST_GET_OUTPUT_PACK_SIZE(v) ((v)&0b00001111)
#define COMMON_INPUT(idx) COMMON_INPUTS_INV[(idx)-1]
#define COMMON_INDEX(v, max, val) \
do { \
val = ((uint16_t)COMMON_INPUTS[v] + 1) % 256; \
val = val > max ? 0 : val; \
} while (0)
// uint8_t commonInput(uint8_t idx);
// uint8_t commonIdx(uint8_t v, uint8_t max);
uint8_t packSize(uint64_t n);
uint64_t unpackUint64(uint8_t *ch, uint8_t sz);
uint8_t packDeltaSize(CompiledAddr nodeAddr, CompiledAddr transAddr);
CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr);
typedef struct FstString {
uint8_t *data;
uint8_t *data;
uint32_t len;
int32_t ref;
int32_t ref;
} FstString;
typedef struct FstSlice {
FstString *str;
int32_t start;
int32_t end;
FstString *str;
int32_t start;
int32_t end;
} FstSlice;
FstSlice fstSliceCreate(uint8_t *data, uint64_t len);
FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end);
FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end);
bool fstSliceIsEmpty(FstSlice *s);
int fstSliceCompare(FstSlice *s1, FstSlice *s2);
void fstSliceDestroy(FstSlice *s);
uint8_t *fstSliceData(FstSlice *s, int32_t *sz);
bool fstSliceIsEmpty(FstSlice *s);
int fstSliceCompare(FstSlice *s1, FstSlice *s2);
void fstSliceDestroy(FstSlice *s);
uint8_t *fstSliceData(FstSlice *s, int32_t *sz);
#define FST_SLICE_LEN(s) (s->end - s->start + 1)
//// stack
//// stack
//
//typedef (*StackFreeElemFn)(void *elem);
// typedef (*StackFreeElemFn)(void *elem);
//
//typedef struct FstStack {
// void *first;
// void *end;
// size_t elemSize;
// typedef struct FstStack {
// void *first;
// void *end;
// size_t elemSize;
// size_t nElem;
// StackFreeElemFn fn;
//} FstStack;
//
//
//FstStack* fstStackCreate(size_t elemSize, stackFreeElem);
//void *fstStackPush(FstStack *s, void *elem);
//void *fstStackTop(FstStack *s);
//size_t fstStackLen(FstStack *s);
//void fstStackDestory(FstStack *);
// FstStack* fstStackCreate(size_t elemSize, stackFreeElem);
// void *fstStackPush(FstStack *s, void *elem);
// void *fstStackTop(FstStack *s);
// size_t fstStackLen(FstStack *s);
// void fstStackDestory(FstStack *);
//
#ifdef __cplusplus
}
#endif
......
......@@ -17,10 +17,10 @@
#include "index.h"
#include "indexInt.h"
#include "tlockfree.h"
#include "index_tfile.h"
#include "index_fst_counting_writer.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_tfile.h"
#include "tlockfree.h"
#ifdef __cplusplus
extern "C" {
......@@ -29,92 +29,85 @@ extern "C" {
// tfile header
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
typedef struct TFileReadHeader {
uint64_t suid;
int32_t version;
char colName[128]; //
uint8_t colType;
char colName[128]; //
uint8_t colType;
} TFileReadHeader;
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
int32_t version;
uint64_t suid;
uint8_t colType;
int32_t version;
const char *colName;
int32_t nColName;
} TFileCacheKey;
int32_t nColName;
} TFileCacheKey;
// table cache
// refactor to LRU cache later
typedef struct TFileCache {
SHashObj *tableCache;
int16_t capacity;
// add more param
SHashObj *tableCache;
int16_t capacity;
// add more param
} TFileCache;
typedef struct TFileWriter {
FstBuilder *fb;
WriterCtx *ctx;
WriterCtx * ctx;
} TFileWriter;
typedef struct TFileReader {
T_REF_DECLARE()
Fst *fst;
WriterCtx *ctx;
} TFileReader;
T_REF_DECLARE()
Fst * fst;
WriterCtx * ctx;
TFileReadHeader header;
} TFileReader;
typedef struct IndexTFile {
char *path;
TFileCache *cache;
TFileWriter *tw;
char * path;
TFileCache * cache;
TFileWriter *tw;
} IndexTFile;
typedef struct TFileWriterOpt {
uint64_t suid;
int8_t colType;
char *colName;
int32_t nColName;
char * colName;
int32_t nColName;
int32_t version;
} TFileWriterOpt;
} TFileWriterOpt;
typedef struct TFileReaderOpt {
uint64_t suid;
char *colName;
int32_t nColName;
uint64_t suid;
char * colName;
int32_t nColName;
} TFileReaderOpt;
// tfile cache, manage tindex reader
TFileCache *tfileCacheCreate(const char *path);
void tfileCacheDestroy(TFileCache *tcache);
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
TFileReader* tfileReaderCreate();
void TFileReaderDestroy(TFileReader *reader);
// tfile cache, manage tindex reader
TFileCache * tfileCacheCreate(const char *path);
void tfileCacheDestroy(TFileCache *tcache);
TFileReader *tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
TFileReader *tfileReaderCreate();
void TFileReaderDestroy(TFileReader *reader);
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
void tfileWriterDestroy(TFileWriter *tw);
void tfileWriterDestroy(TFileWriter *tw);
//
//
IndexTFile *indexTFileCreate(const char *path);
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
#ifdef __cplusplus
}
#endif
#endif
......@@ -19,33 +19,32 @@
extern "C" {
#endif
#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \
do { \
memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \
buf += sizeof(key->mem); \
#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \
do { \
memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \
buf += sizeof(key->mem); \
} while (0)
#define SERIALIZE_STR_MEM_TO_BUF(buf, key, mem, len) \
do { \
memcpy((void *)buf, (void *)key->mem, len); \
buf += len; \
do { \
memcpy((void *)buf, (void *)key->mem, len); \
buf += len; \
} while (0)
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(var) == sizeof(type));\
memcpy((void *)buf, (void *)&c, sizeof(c)); \
buf += sizeof(c); \
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(var) == sizeof(type)); \
memcpy((void *)buf, (void *)&c, sizeof(c)); \
buf += sizeof(c); \
} while (0)
#define SERIALIZE_STR_VAR_TO_BUF(buf, var, len) \
do { \
memcpy((void *)buf, (void *)var, len); \
buf += len;\
do { \
memcpy((void *)buf, (void *)var, len); \
buf += len; \
} while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -26,105 +26,108 @@
static int uidCompare(const void *a, const void *b) {
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
if (u1 == u2) { return 0; }
else { return u1 < u2 ? -1 : 1; }
if (u1 == u2) {
return 0;
} else {
return u1 < u2 ? -1 : 1;
}
}
typedef struct SIdxColInfo {
int colId; // generated by index internal
int colId; // generated by index internal
int cVersion;
} SIdxColInfo;
} SIdxColInfo;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit();
static void indexInit();
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result);
static int indexMergeCacheIntoTindex(SIndex *sIdx);
static void indexInterResultsDestroy(SArray *results);
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult);
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult);
int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
pthread_once(&isInit, indexInit);
SIndex *sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { return -1; }
if (sIdx == NULL) {
return -1;
}
#ifdef USE_LUCENE
index_t *index = index_open(path);
#ifdef USE_LUCENE
index_t *index = index_open(path);
sIdx->index = index;
#endif
sIdx->cache = (void*)indexCacheCreate();
sIdx->tindex = NULL;
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->colId = 1;
sIdx->cVersion = 1;
sIdx->cache = (void *)indexCacheCreate();
sIdx->tindex = NULL;
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->colId = 1;
sIdx->cVersion = 1;
pthread_mutex_init(&sIdx->mtx, NULL);
*index = sIdx;
return 0;
*index = sIdx;
return 0;
}
void indexClose(SIndex *sIdx) {
#ifdef USE_LUCENE
index_close(sIdex->index);
#ifdef USE_LUCENE
index_close(sIdex->index);
sIdx->index = NULL;
#endif
#ifdef USE_INVERTED_INDEX
indexCacheDestroy(sIdx->cache);
taosHashCleanup(sIdx->colObj);
taosHashCleanup(sIdx->colObj);
pthread_mutex_destroy(&sIdx->mtx);
#endif
free(sIdx);
free(sIdx);
return;
}
int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) {
#ifdef USE_LUCENE
index_document_t *doc = index_document_create();
char buf[16] = {0};
sprintf(buf, "%d", uid);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
}
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
int indexPut(SIndex *index, SIndexMultiTerm *fVals, uint64_t uid) {
#ifdef USE_LUCENE
index_document_t *doc = index_document_create();
char buf[16] = {0};
sprintf(buf, "%d", uid);
index_put(index->index, doc);
index_document_destroy(doc);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
}
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
index_put(index->index, doc);
index_document_destroy(doc);
#endif
#ifdef USE_INVERTED_INDEX
//TODO(yihao): reduce the lock range
pthread_mutex_lock(&index->mtx);
// TODO(yihao): reduce the lock range
pthread_mutex_lock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
SIndexTerm * p = taosArrayGetP(fVals, i);
SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName);
if (fi == NULL) {
SIdxColInfo tfi = {.colId = index->colId};
index->cVersion++;
index->cVersion++;
index->colId++;
taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi));
taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi));
} else {
//TODO, del
// TODO, del
}
}
}
pthread_mutex_unlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
SIndexTerm * p = taosArrayGetP(fVals, i);
SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName);
assert(fi != NULL);
int32_t colId = fi->colId;
assert(fi != NULL);
int32_t colId = fi->colId;
int32_t version = index->cVersion;
int ret = indexCachePut(index->cache, p, colId, version, uid);
int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) {
return ret;
return ret;
}
}
#endif
......@@ -132,29 +135,29 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) {
return 0;
}
int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) {
#ifdef USE_LUCENE
EIndexOperatorType opera = multiQuerys->opera;
#ifdef USE_LUCENE
EIndexOperatorType opera = multiQuerys->opera;
int nQuery = taosArrayGetSize(multiQuerys->query);
int nQuery = taosArrayGetSize(multiQuerys->query);
char **fields = malloc(sizeof(char *) * nQuery);
char **keys = malloc(sizeof(char *) * nQuery);
int *types = malloc(sizeof(int) * nQuery);
char **keys = malloc(sizeof(char *) * nQuery);
int * types = malloc(sizeof(int) * nQuery);
for (int i = 0; i < nQuery; i++) {
SIndexTermQuery *p = taosArrayGet(multiQuerys->query, i);
SIndexTerm *term = p->field_value;
fields[i] = calloc(1, term->nKey + 1);
keys[i] = calloc(1, term->nVal + 1);
memcpy(fields[i], term->key, term->nKey);
memcpy(keys[i], term->val, term->nVal);
types[i] = (int)(p->type);
}
int *tResult = NULL;
int tsz= 0;
SIndexTermQuery *p = taosArrayGet(multiQuerys->query, i);
SIndexTerm * term = p->field_value;
fields[i] = calloc(1, term->nKey + 1);
keys[i] = calloc(1, term->nVal + 1);
memcpy(fields[i], term->key, term->nKey);
memcpy(keys[i], term->val, term->nVal);
types[i] = (int)(p->type);
}
int *tResult = NULL;
int tsz = 0;
index_multi_search(index->index, (const char **)fields, (const char **)keys, types, nQuery, opera, &tResult, &tsz);
for (int i = 0; i < tsz; i++) {
taosArrayPush(result, &tResult[i]);
}
......@@ -169,57 +172,55 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
#endif
#ifdef USE_INVERTED_INDEX
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray *interResults = taosArrayInit(4, POINTER_BYTES);
int nQuery = taosArrayGetSize(multiQuerys->query);
int nQuery = taosArrayGetSize(multiQuerys->query);
for (size_t i = 0; i < nQuery; i++) {
SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i);
SArray *tResult = NULL;
indexTermSearch(index, qTerm, &tResult);
taosArrayPush(interResults, (void *)&tResult);
}
SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i);
SArray * tResult = NULL;
indexTermSearch(index, qTerm, &tResult);
taosArrayPush(interResults, (void *)&tResult);
}
indexMergeFinalResults(interResults, opera, result);
indexInterResultsDestroy(interResults);
#endif
return 1;
}
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
#ifdef USE_INVERTED_INDEX
#endif
return 1;
}
int indexRebuild(SIndex *index, SIndexOpts *opts) {
int indexRebuild(SIndex *index, SIndexOpts *opts){
#ifdef USE_INVERTED_INDEX
#endif
}
SIndexOpts *indexOptsCreate() {
#ifdef USE_LUCENE
#ifdef USE_LUCENE
#endif
return NULL;
return NULL;
}
void indexOptsDestroy(SIndexOpts *opts) {
#ifdef USE_LUCENE
void indexOptsDestroy(SIndexOpts *opts){
#ifdef USE_LUCENE
#endif
}
/*
* @param: oper
*
*/
} /*
* @param: oper
*
*/
SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { return NULL; }
p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
if (p == NULL) {
return NULL;
}
p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p;
}
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
......@@ -227,25 +228,27 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
SIndexTermQuery *p = (SIndexTermQuery *)taosArrayGet(pQuery->query, i);
indexTermDestroy(p->term);
}
taosArrayDestroy(pQuery->query);
taosArrayDestroy(pQuery->query);
free(pQuery);
};
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType qType){
SIndexTermQuery q = {.qType = qType, .term = term};
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType qType) {
SIndexTermQuery q = {.qType = qType, .term = term};
taosArrayPush(pQuery->query, &q);
return 0;
}
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName,
int32_t nColName, const char *colVal, int32_t nColVal) {
SIndexTerm *t = (SIndexTerm *)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) {
return NULL;
}
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName, int32_t nColName, const char *colVal, int32_t nColVal) {
SIndexTerm *t = (SIndexTerm *)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { return NULL; }
t->suid = suid;
t->operType= oper;
t->suid = suid;
t->operType = oper;
t->colType = colType;
t->colName = (char *)calloc(1, nColName + 1);
t->colName = (char *)calloc(1, nColName + 1);
memcpy(t->colName, colName, nColName);
t->nColName = nColName;
......@@ -258,15 +261,13 @@ void indexTermDestroy(SIndexTerm *p) {
free(p->colName);
free(p->colVal);
free(p);
}
SIndexMultiTerm *indexMultiTermCreate() {
return taosArrayInit(4, sizeof(SIndexTerm *));
}
SIndexMultiTerm *indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm *)); }
int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term) {
taosArrayPush(terms, &term);
return 0;
taosArrayPush(terms, &term);
return 0;
}
void indexMultiTermDestroy(SIndexMultiTerm *terms) {
for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
......@@ -277,40 +278,40 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) {
}
void indexInit() {
//do nothing
// do nothing
}
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) {
int32_t version = -1;
int16_t colId = -1;
int32_t version = -1;
int16_t colId = -1;
SIdxColInfo *colInfo = NULL;
SIndexTerm *term = query->term;
const char *colName = term->colName;
int32_t nColName = term->nColName;
int32_t nColName = term->nColName;
pthread_mutex_lock(&sIdx->mtx);
colInfo = taosHashGet(sIdx->colObj, colName, nColName);
pthread_mutex_lock(&sIdx->mtx);
colInfo = taosHashGet(sIdx->colObj, colName, nColName);
if (colInfo == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
colId = colInfo->colId;
colId = colInfo->colId;
version = colInfo->cVersion;
pthread_mutex_unlock(&sIdx->mtx);
pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
//TODO: iterator mem and tidex
STermValueType s;
// TODO: iterator mem and tidex
STermValueType s;
if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName);
// coloum already drop by other oper, no need to query tindex
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
}
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
}
}
} else {
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
......@@ -319,39 +320,40 @@ static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result
return 0;
}
static void indexInterResultsDestroy(SArray *results) {
if (results == NULL) { return; }
if (results == NULL) {
return;
}
size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) {
SArray *p = taosArrayGetP(results, i);
taosArrayDestroy(p);
}
taosArrayDestroy(p);
}
taosArrayDestroy(results);
}
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) {
//refactor, merge interResults into fResults by oType
SArray *first = taosArrayGetP(interResults, 0);
taosArraySort(first, uidCompare);
// refactor, merge interResults into fResults by oType
SArray *first = taosArrayGetP(interResults, 0);
taosArraySort(first, uidCompare);
taosArrayRemoveDuplicate(first, uidCompare, NULL);
if (oType == MUST) {
// just one column index, enhance later
taosArrayAddAll(fResults, first);
// just one column index, enhance later
taosArrayAddAll(fResults, first);
} else if (oType == SHOULD) {
// just one column index, enhance later
taosArrayAddAll(fResults, first);
// just one column index, enhance later
taosArrayAddAll(fResults, first);
// tag1 condistion || tag2 condition
} else if (oType == NOT) {
// just one column index, enhance later
taosArrayAddAll(fResults, first);
// not use currently
// just one column index, enhance later
taosArrayAddAll(fResults, first);
// not use currently
}
return 0;
}
static int indexMergeCacheIntoTindex(SIndex *sIdx) {
if (sIdx == NULL) {
return -1;
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
......@@ -14,148 +14,154 @@
*/
#include "index_cache.h"
#include "tcompare.h"
#include "index_util.h"
#include "tcompare.h"
#define MAX_INDEX_KEY_LEN 256// test only, change later
#define MAX_INDEX_KEY_LEN 256 // test only, change later
// ref index_cache.h:22
#define CACHE_KEY_LEN(p) (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
// ref index_cache.h:22
#define CACHE_KEY_LEN(p) \
(sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + \
sizeof(p->operType))
static char* getIndexKey(const void *pData) {
return NULL;
}
static char * getIndexKey(const void *pData) { return NULL; }
static int32_t compareKey(const void *l, const void *r) {
char *lp = (char *)l;
char *rp = (char *)r;
// skip total len, not compare
int32_t ll, rl; // len
int32_t ll, rl; // len
memcpy(&ll, lp, sizeof(int32_t));
memcpy(&rl, rp, sizeof(int32_t));
lp += sizeof(int32_t);
lp += sizeof(int32_t);
rp += sizeof(int32_t);
// compare field id
int16_t lf, rf; // field id
int16_t lf, rf; // field id
memcpy(&lf, lp, sizeof(lf));
memcpy(&rf, rp, sizeof(rf));
if (lf != rf) {
return lf < rf ? -1: 1;
return lf < rf ? -1 : 1;
}
lp += sizeof(lf);
rp += sizeof(rf);
// compare field type
int8_t lft, rft;
int8_t lft, rft;
memcpy(&lft, lp, sizeof(lft));
memcpy(&rft, rp, sizeof(rft));
lp += sizeof(lft);
rp += sizeof(rft);
assert(rft == rft);
// skip value len
// skip value len
int32_t lfl, rfl;
memcpy(&lfl, lp, sizeof(lfl));
memcpy(&rfl, rp, sizeof(rfl));
memcpy(&lfl, lp, sizeof(lfl));
memcpy(&rfl, rp, sizeof(rfl));
lp += sizeof(lfl);
rp += sizeof(rfl);
// compare value
// compare value
int32_t i, j;
for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
if (lp[i] == rp[j]) { continue; }
else { return lp[i] < rp[j] ? -1 : 1;}
if (lp[i] == rp[j]) {
continue;
} else {
return lp[i] < rp[j] ? -1 : 1;
}
}
if (i < lfl) {
return 1;
} else if (j < rfl) {
return -1;
}
if (i < lfl) { return 1;}
else if (j < rfl) { return -1; }
lp += lfl;
rp += rfl;
rp += rfl;
// skip uid
// skip uid
uint64_t lu, ru;
memcpy(&lu, lp, sizeof(lu));
memcpy(&lu, lp, sizeof(lu));
memcpy(&ru, rp, sizeof(ru));
lp += sizeof(lu);
rp += sizeof(ru);
// compare version, desc order
int32_t lv, rv;
memcpy(&lv, lp, sizeof(lv));
memcpy(&rv, rp, sizeof(rv));
if (lv != rv) {
return lv > rv ? -1 : 1;
}
return lv > rv ? -1 : 1;
}
lp += sizeof(lv);
rp += sizeof(rv);
// not care item type
return 0;
}
return 0;
}
IndexCache *indexCacheCreate() {
IndexCache *cache = calloc(1, sizeof(IndexCache));
cache->skiplist = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
cache->skiplist = tSkipListCreate(
MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
return cache;
}
void indexCacheDestroy(void *cache) {
IndexCache *pCache = cache;
if (pCache == NULL) { return; }
IndexCache *pCache = cache;
if (pCache == NULL) {
return;
}
tSkipListDestroy(pCache->skiplist);
free(pCache);
}
int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) {
if (cache == NULL) { return -1;}
if (cache == NULL) {
return -1;
}
IndexCache *pCache = cache;
// encode data
int32_t total = CACHE_KEY_LEN(term);
char *buf = calloc(1, total);
char *p = buf;
int32_t total = CACHE_KEY_LEN(term);
char * buf = calloc(1, total);
char * p = buf;
SERIALIZE_VAR_TO_BUF(p, total,int32_t);
SERIALIZE_VAR_TO_BUF(p, total, int32_t);
SERIALIZE_VAR_TO_BUF(p, colId, int16_t);
SERIALIZE_MEM_TO_BUF(p, term, colType);
SERIALIZE_MEM_TO_BUF(p, term, nColVal);
SERIALIZE_STR_MEM_TO_BUF(p, term, colVal, term->nColVal);
SERIALIZE_MEM_TO_BUF(p, term, nColVal);
SERIALIZE_STR_MEM_TO_BUF(p, term, colVal, term->nColVal);
SERIALIZE_VAR_TO_BUF(p, version, int32_t);
SERIALIZE_VAR_TO_BUF(p, uid, uint64_t);
SERIALIZE_VAR_TO_BUF(p, uid, uint64_t);
SERIALIZE_MEM_TO_BUF(p, term, operType);
tSkipListPut(pCache->skiplist, (void *)buf);
tSkipListPut(pCache->skiplist, (void *)buf);
return 0;
// encode end
}
int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
IndexCache *pCache = cache;
return 0;
}
int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) {
if (cache == NULL) { return -1; }
IndexCache *pCache = cache;
SIndexTerm *term = query->term;
EIndexQueryType qtype = query->qType;
int32_t keyLen = CACHE_KEY_LEN(term);
int indexCacheSearch(
void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) {
if (cache == NULL) {
return -1;
}
IndexCache * pCache = cache;
SIndexTerm * term = query->term;
EIndexQueryType qtype = query->qType;
int32_t keyLen = CACHE_KEY_LEN(term);
char *buf = calloc(1, keyLen);
if (qtype == QUERY_TERM) {
} else if (qtype == QUERY_PREFIX) {
} else if (qtype == QUERY_SUFFIX) {
} else if (qtype == QUERY_REGEX) {
}
return 0;
}
此差异已折叠。
......@@ -15,44 +15,49 @@
#include "index_fst_automation.h"
StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void *val) {
StartWithStateValue *nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { return NULL; }
if (nsv == NULL) {
return NULL;
}
nsv->kind = kind;
nsv->type = ty;
if (ty == FST_INT) {
nsv->val = *(int *)val;
} else if (ty == FST_CHAR) {
size_t len = strlen((char *)val);
nsv->ptr = (char *)calloc(1, len + 1);
size_t len = strlen((char *)val);
nsv->ptr = (char *)calloc(1, len + 1);
memcpy(nsv->ptr, val, len);
} else if (ty == FST_ARRAY) {
//TODO,
//nsv->arr = taosArrayFromList()
// TODO,
// nsv->arr = taosArrayFromList()
}
return nsv;
}
void startWithStateValueDestroy(void *val) {
StartWithStateValue *sv = (StartWithStateValue *)val;
if (sv == NULL) { return; }
if (sv == NULL) {
return;
}
if (sv->type == FST_INT) {
//
//
} else if (sv->type == FST_CHAR) {
free(sv->ptr);
} else if (sv->type == FST_ARRAY) {
taosArrayDestroy(sv->arr);
}
free(sv);
free(sv);
}
StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
StartWithStateValue *nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { return NULL; }
if (nsv == NULL) {
return NULL;
}
nsv->kind = sv->kind;
nsv->type= sv->type;
nsv->type = sv->type;
if (nsv->type == FST_INT) {
nsv->val = sv->val;
} else if (nsv->type == FST_CHAR) {
......@@ -64,93 +69,67 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
return nsv;
}
// prefix query, impl later
static void* prefixStart(AutomationCtx *ctx) {
static void *prefixStart(AutomationCtx *ctx) {
StartWithStateValue *data = (StartWithStateValue *)(ctx->stdata);
return startWithStateValueDump(data);
return startWithStateValueDump(data);
};
static bool prefixIsMatch(AutomationCtx *ctx, void *sv) {
StartWithStateValue* ssv = (StartWithStateValue *)sv;
return ssv->val == strlen(ctx->data);
}
static bool prefixCanMatch(AutomationCtx *ctx, void *sv) {
StartWithStateValue* ssv = (StartWithStateValue *)sv;
return ssv->val >= 0;
StartWithStateValue *ssv = (StartWithStateValue *)sv;
return ssv->val == strlen(ctx->data);
}
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
static bool prefixCanMatch(AutomationCtx *ctx, void *sv) {
StartWithStateValue *ssv = (StartWithStateValue *)sv;
return ssv->val >= 0;
}
static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
StartWithStateValue* ssv = (StartWithStateValue *)state;
if (ssv == NULL || ctx == NULL) {return NULL;}
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) { return true; }
static void *prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
StartWithStateValue *ssv = (StartWithStateValue *)state;
if (ssv == NULL || ctx == NULL) {
return NULL;
}
char *data = ctx->data;
if (ssv->kind == Done) {
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
}
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
int val = ssv->val + 1;
int val = ssv->val + 1;
StartWithStateValue *nsv = startWithStateValueCreate(Running, FST_INT, &val);
if (prefixIsMatch(ctx, nsv)) {
nsv->kind = Done;
} else {
nsv->kind = Running;
}
}
return nsv;
}
return NULL;
}
static void* prefixAcceptEof(AutomationCtx *ctx, void *state) {
}
return NULL;
}
static void *prefixAcceptEof(AutomationCtx *ctx, void *state) { return NULL; }
// pattern query, impl later
static void* patternStart(AutomationCtx *ctx) {
return NULL;
}
static bool patternIsMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool patternCanMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
}
static void *patternStart(AutomationCtx *ctx) { return NULL; }
static bool patternIsMatch(AutomationCtx *ctx, void *data) { return true; }
static bool patternCanMatch(AutomationCtx *ctx, void *data) { return true; }
static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) { return true; }
static void* patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
return NULL;
}
static void *patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) { return NULL; }
static void* patternAcceptEof(AutomationCtx *ctx, void *state) {
return NULL;
}
static void *patternAcceptEof(AutomationCtx *ctx, void *state) { return NULL; }
AutomationFunc automFuncs[] = {{
prefixStart,
prefixIsMatch,
prefixCanMatch,
prefixWillAlwaysMatch,
prefixAccept,
prefixAcceptEof
},
{
patternStart,
patternIsMatch,
patternCanMatch,
patternWillAlwaysMatch,
patternAccept,
patternAcceptEof
}
// add more search type
AutomationFunc automFuncs[] = {
{prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof},
{patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof}
// add more search type
};
AutomationCtx* automCtxCreate(void *data,AutomationType atype) {
AutomationCtx *automCtxCreate(void *data, AutomationType atype) {
AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { return NULL; }
if (ctx == NULL) {
return NULL;
}
StartWithStateValue *sv = NULL;
if (atype == AUTOMATION_PREFIX) {
......@@ -158,22 +137,21 @@ AutomationCtx* automCtxCreate(void *data,AutomationType atype) {
sv = startWithStateValueCreate(Running, FST_INT, &val);
ctx->stdata = (void *)sv;
} else if (atype == AUTMMATION_MATCH) {
} else {
// add more search type
}
char* src = (char *)data;
char * src = (char *)data;
size_t len = strlen(src);
char* dst = (char *)malloc(len * sizeof(char) + 1);
char * dst = (char *)malloc(len * sizeof(char) + 1);
memcpy(dst, src, len);
dst[len] = 0;
ctx->data = dst;
ctx->type = atype;
ctx->stdata = (void *)sv;
return ctx;
}
ctx->data = dst;
ctx->type = atype;
ctx->stdata = (void *)sv;
return ctx;
}
void automCtxDestroy(AutomationCtx *ctx) {
startWithStateValueDestroy(ctx->stdata);
free(ctx->data);
......
......@@ -12,10 +12,10 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tutil.h"
#include "index_fst_counting_writer.h"
#include "indexInt.h"
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
#include "tutil.h"
static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
if (ctx->offset + len > ctx->limit) {
......@@ -25,13 +25,13 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
if (ctx->type == TFile) {
assert(len == tfWrite(ctx->file.fd, buf, len));
} else {
memcpy(ctx->mem.buf+ ctx->offset, buf, len);
}
memcpy(ctx->mem.buf + ctx->offset, buf, len);
}
ctx->offset += len;
return len;
}
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
int nRead = 0;
int nRead = 0;
if (ctx->type == TFile) {
nRead = tfRead(ctx->file.fd, buf, len);
} else {
......@@ -40,110 +40,116 @@ static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
ctx->offset += nRead;
return nRead;
}
}
static int writeCtxDoFlush(WriterCtx *ctx) {
if (ctx->type == TFile) {
//tfFsync(ctx->fd);
//tfFlush(ctx->file.fd);
// tfFsync(ctx->fd);
// tfFlush(ctx->file.fd);
} else {
// do nothing
}
return 1;
}
WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) {
WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) {
WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
if (ctx == NULL) { return NULL; }
if (ctx == NULL) {
return NULL;
}
ctx->type = type;
if (ctx->type == TFile) {
// ugly code, refactor later
ctx->file.readOnly = readOnly;
if (readOnly == false) {
ctx->file.fd = tfOpenCreateWriteAppend(tmpFile);
ctx->file.fd = tfOpenCreateWriteAppend(tmpFile);
} else {
ctx->file.fd = tfOpenReadWrite(tmpFile);
}
}
if (ctx->file.fd < 0) {
goto END;
indexError("open file error %d", errno);
indexError("open file error %d", errno);
}
} else if (ctx->type == TMemory) {
ctx->mem.buf = calloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity;
}
ctx->mem.buf = calloc(1, sizeof(char) * capacity);
ctx->mem.capa = capacity;
}
ctx->write = writeCtxDoWrite;
ctx->read = writeCtxDoRead;
ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush;
ctx->offset = 0;
ctx->limit = capacity;
ctx->limit = capacity;
return ctx;
END:
if (ctx->type == TMemory) { free(ctx->mem.buf); }
if (ctx->type == TMemory) {
free(ctx->mem.buf);
}
free(ctx);
}
void writerCtxDestroy(WriterCtx *ctx) {
if (ctx->type == TMemory) {
free(ctx->mem.buf);
} else {
tfClose(ctx->file.fd);
tfClose(ctx->file.fd);
}
free(ctx);
}
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
if (cw == NULL) { return NULL; }
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
if (cw == NULL) {
return NULL;
}
cw->wrt = wrt;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw;
//(void *)(writerCtxCreate(TFile, readOnly));
return cw;
}
void fstCountingWriterDestroy(FstCountingWriter *cw) {
// free wrt object: close fd or free mem
// free wrt object: close fd or free mem
fstCountingWriterFlush(cw);
//writerCtxDestroy((WriterCtx *)(cw->wrt));
// writerCtxDestroy((WriterCtx *)(cw->wrt));
free(cw);
}
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) {
if (write == NULL) { return 0; }
// update checksum
if (write == NULL) {
return 0;
}
// update checksum
// write data to file/socket or mem
WriterCtx *ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len);
int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len);
write->count += len;
return len;
}
return len;
}
int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) {
if (write == NULL) { return 0; }
if (write == NULL) {
return 0;
}
WriterCtx *ctx = write->wrt;
int nRead = ctx->read(ctx, buf, len);
//assert(nRead == len);
return nRead;
}
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
return 0;
int nRead = ctx->read(ctx, buf, len);
// assert(nRead == len);
return nRead;
}
int fstCountingWriterFlush(FstCountingWriter *write) {
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { return 0; }
int fstCountingWriterFlush(FstCountingWriter *write) {
WriterCtx *ctx = write->wrt;
ctx->flush(ctx);
//write->wtr->flush
// write->wtr->flush
return 1;
}
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n, uint8_t nBytes) {
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n, uint8_t nBytes) {
assert(1 <= nBytes && nBytes <= 8);
uint8_t *buf = calloc(8, sizeof(uint8_t));
uint8_t *buf = calloc(8, sizeof(uint8_t));
for (uint8_t i = 0; i < nBytes; i++) {
buf[i] = (uint8_t)n;
buf[i] = (uint8_t)n;
n = n >> 8;
}
fstCountingWriterWrite(writer, buf, nBytes);
......@@ -154,7 +160,5 @@ void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n, uint8_t
uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n) {
uint8_t nBytes = packSize(n);
fstCountingWriterPackUintIn(writer, n, nBytes);
return nBytes;
}
return nBytes;
}
......@@ -16,30 +16,34 @@
FstBuilderNode *fstBuilderNodeDefault() {
FstBuilderNode *bn = malloc(sizeof(FstBuilderNode));
bn->isFinal = false;
bn->finalOutput = 0;
bn->trans = taosArrayInit(16, sizeof(FstTransition));
bn->isFinal = false;
bn->finalOutput = 0;
bn->trans = taosArrayInit(16, sizeof(FstTransition));
return bn;
}
void fstBuilderNodeDestroy(FstBuilderNode *node) {
if (node == NULL) { return; }
if (node == NULL) {
return;
}
taosArrayDestroy(node->trans);
free(node);
}
}
bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2) {
if (n1 == n2) { return true; }
if (n1 == NULL || n2 == NULL ) {
if (n1 == n2) {
return true;
}
if (n1 == NULL || n2 == NULL) {
return false;
}
if (n1->isFinal != n2->isFinal || n1->finalOutput != n2->finalOutput) {
return false;
}
size_t s1 = n1->trans? taosArrayGetSize(n1->trans): 0;
size_t s2 = n2->trans? taosArrayGetSize(n2->trans): 0;
if (s1 != s2) {
size_t s1 = n1->trans ? taosArrayGetSize(n1->trans) : 0;
size_t s2 = n2->trans ? taosArrayGetSize(n2->trans) : 0;
if (s1 != s2) {
return false;
}
for (size_t i = 0; i < s1; i++) {
......@@ -47,69 +51,70 @@ bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2) {
FstTransition *t2 = taosArrayGet(n2->trans, i);
if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) {
return false;
}
}
}
return true;
}
FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) {
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
if (node == NULL) { return NULL; }
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
if (node == NULL) {
return NULL;
}
//
size_t sz = taosArrayGetSize(src->trans);
//
size_t sz = taosArrayGetSize(src->trans);
SArray *trans = taosArrayInit(sz, sizeof(FstTransition));
for (size_t i = 0; i < sz; i++) {
FstTransition *tran = taosArrayGet(src->trans, i);
taosArrayPush(trans, tran);
taosArrayPush(trans, tran);
}
node->trans = trans;
node->trans = trans;
node->isFinal = src->isFinal;
node->finalOutput = src->finalOutput;
return node;
}
// not destroy src, User's bussiness
// not destroy src, User's bussiness
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) {
if (dst == NULL || src == NULL) { return; }
if (dst == NULL || src == NULL) {
return;
}
dst->isFinal = src->isFinal;
dst->isFinal = src->isFinal;
dst->finalOutput = src->finalOutput;
//release free avoid mem leak
taosArrayDestroy(dst->trans);
// release free avoid mem leak
taosArrayDestroy(dst->trans);
size_t sz = taosArrayGetSize(src->trans);
dst->trans = taosArrayInit(sz, sizeof(FstTransition));
dst->trans = taosArrayInit(sz, sizeof(FstTransition));
for (size_t i = 0; i < sz; i++) {
FstTransition *trn = taosArrayGet(src->trans, i);
FstTransition *trn = taosArrayGet(src->trans, i);
taosArrayPush(dst->trans, trn);
}
}
}
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr
// startAddr) {
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
//size_t sz = taosArrayGetSize(b->trans);
//assert(sz < 256);
//if (FST_BUILDER_NODE_IS_FINAL(b)
// && FST_BUILDER_NODE_TRANS_ISEMPTY(b)
// && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) {
// return true;
//} else if (sz != 1 || b->isFinal) {
// // AnyTrans->Compile(w, addr, node);
//} else {
// FstTransition *tran = taosArrayGet(b->trans, 0);
// if (tran->addr == lastAddr && tran->out == 0) {
// //OneTransNext::compile(w, lastAddr, tran->inp);
// return true;
// } else {
// //OneTrans::Compile(w, lastAddr, *tran);
// return true;
// }
//}
//return true;
//}
// size_t sz = taosArrayGetSize(b->trans);
// assert(sz < 256);
// if (FST_BUILDER_NODE_IS_FINAL(b)
// && FST_BUILDER_NODE_TRANS_ISEMPTY(b)
// && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) {
// return true;
//} else if (sz != 1 || b->isFinal) {
// // AnyTrans->Compile(w, addr, node);
//} else {
// FstTransition *tran = taosArrayGet(b->trans, 0);
// if (tran->addr == lastAddr && tran->out == 0) {
// //OneTransNext::compile(w, lastAddr, tran->inp);
// return true;
// } else {
// //OneTrans::Compile(w, lastAddr, *tran);
// return true;
// }
//}
// return true;
//}
......@@ -15,33 +15,33 @@
#include "index_fst_registry.h"
uint64_t fstRegistryHash(FstRegistry *registry, FstBuilderNode *bNode) {
//TODO(yihaoDeng): refactor later
// TODO(yihaoDeng): refactor later
const uint64_t FNV_PRIME = 1099511628211;
uint64_t h = 14695981039346656037u;
uint64_t h = 14695981039346656037u;
h = (h ^ (uint64_t)bNode->isFinal) * FNV_PRIME;
h = (h ^ (uint64_t)bNode->isFinal) * FNV_PRIME;
h = (h ^ (bNode)->finalOutput) * FNV_PRIME;
uint32_t sz = (uint32_t)taosArrayGetSize(bNode->trans);
uint32_t sz = (uint32_t)taosArrayGetSize(bNode->trans);
for (uint32_t i = 0; i < sz; i++) {
FstTransition *trn = taosArrayGet(bNode->trans, i);
h = (h ^ (uint64_t)(trn->inp)) * FNV_PRIME;
h = (h ^ (uint64_t)(trn->out)) * FNV_PRIME;
h = (h ^ (uint64_t)(trn->addr))* FNV_PRIME;
}
return h %(registry->tableSize);
h = (h ^ (uint64_t)(trn->inp)) * FNV_PRIME;
h = (h ^ (uint64_t)(trn->out)) * FNV_PRIME;
h = (h ^ (uint64_t)(trn->addr)) * FNV_PRIME;
}
return h % (registry->tableSize);
}
static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) {
size_t sz = taosArrayGetSize(arr);
if (a >= sz || b >= sz) { return; }
if (a >= sz || b >= sz) {
return;
}
FstRegistryCell *cell1 = (FstRegistryCell *)taosArrayGet(arr, a);
FstRegistryCell *cell1 = (FstRegistryCell *)taosArrayGet(arr, a);
FstRegistryCell *cell2 = (FstRegistryCell *)taosArrayGet(arr, b);
FstRegistryCell t = {.addr = cell1->addr, .node = cell1->node};
FstRegistryCell t = {.addr = cell1->addr, .node = cell1->node};
cell1->addr = cell2->addr;
cell1->node = cell2->node;
......@@ -52,49 +52,55 @@ static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) {
}
static void fstRegistryCellPromote(SArray *arr, uint32_t start, uint32_t end) {
size_t sz = taosArrayGetSize(arr);
if (start >= sz && end >= sz) {return; }
size_t sz = taosArrayGetSize(arr);
if (start >= sz && end >= sz) {
return;
}
assert(start >= end);
int32_t s = (int32_t)start;
int32_t e = (int32_t)end;
while(s > e) {
while (s > e) {
fstRegistryCellSwap(arr, s - 1, s);
s -= 1;
}
}
FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) {
FstRegistry *registry = malloc(sizeof(FstRegistry));
if (registry == NULL) { return NULL ;}
FstRegistry *fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) {
FstRegistry *registry = malloc(sizeof(FstRegistry));
if (registry == NULL) {
return NULL;
}
uint64_t nCells = tableSize * mruSize;
SArray* tb = (SArray *)taosArrayInit(nCells, sizeof(FstRegistryCell));
if (NULL == tb) {
free(registry);
return NULL;
uint64_t nCells = tableSize * mruSize;
SArray * tb = (SArray *)taosArrayInit(nCells, sizeof(FstRegistryCell));
if (NULL == tb) {
free(registry);
return NULL;
}
for (uint64_t i = 0; i < nCells; i++) {
FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()};
taosArrayPush(tb, &cell);
FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()};
taosArrayPush(tb, &cell);
}
registry->table = tb;
registry->tableSize = tableSize;
registry->mruSize = mruSize;
return registry;
registry->table = tb;
registry->tableSize = tableSize;
registry->mruSize = mruSize;
return registry;
}
void fstRegistryDestroy(FstRegistry *registry) {
if (registry == NULL) { return; }
if (registry == NULL) {
return;
}
SArray *tb = registry->table;
size_t sz = taosArrayGetSize(tb);
size_t sz = taosArrayGetSize(tb);
for (size_t i = 0; i < sz; i++) {
FstRegistryCell *cell = taosArrayGet(tb, i);
fstBuilderNodeDestroy(cell->node);
FstRegistryCell *cell = taosArrayGet(tb, i);
fstBuilderNodeDestroy(cell->node);
}
taosArrayDestroy(tb);
free(registry);
......@@ -102,74 +108,70 @@ void fstRegistryDestroy(FstRegistry *registry) {
FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode) {
if (taosArrayGetSize(registry->table) <= 0) {
return NULL;
}
return NULL;
}
uint64_t bucket = fstRegistryHash(registry, bNode);
uint64_t start = registry->mruSize * bucket;
uint64_t end = start + registry->mruSize;
uint64_t start = registry->mruSize * bucket;
uint64_t end = start + registry->mruSize;
FstRegistryEntry *entry = malloc(sizeof(FstRegistryEntry));
if (end - start == 1) {
FstRegistryCell *cell = taosArrayGet(registry->table, start);
//cell->isNode &&
FstRegistryCell *cell = taosArrayGet(registry->table, start);
// cell->isNode &&
if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
entry->state = FOUND;
entry->addr = cell->addr ;
return entry;
entry->state = FOUND;
entry->addr = cell->addr;
return entry;
} else {
fstBuilderNodeCloneFrom(cell->node, bNode);
entry->state = NOTFOUND;
entry->cell = cell; // copy or not
fstBuilderNodeCloneFrom(cell->node, bNode);
entry->state = NOTFOUND;
entry->cell = cell; // copy or not
}
} else if (end - start == 2) {
FstRegistryCell *cell1 = taosArrayGet(registry->table, start);
FstRegistryCell *cell1 = taosArrayGet(registry->table, start);
if (cell1->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell1->node, bNode)) {
entry->state = FOUND;
entry->addr = cell1->addr;
entry->state = FOUND;
entry->addr = cell1->addr;
return entry;
}
FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1);
}
FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1);
if (cell2->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell2->node, bNode)) {
entry->state = FOUND;
entry->addr = cell2->addr;
entry->state = FOUND;
entry->addr = cell2->addr;
// must swap here
fstRegistryCellSwap(registry->table, start, start + 1);
return entry;
fstRegistryCellSwap(registry->table, start, start + 1);
return entry;
}
//clone from bNode, refactor later
// clone from bNode, refactor later
fstBuilderNodeCloneFrom(cell2->node, bNode);
fstRegistryCellSwap(registry->table, start, start + 1);
FstRegistryCell *cCell = taosArrayGet(registry->table, start);
entry->state = NOTFOUND;
entry->cell = cCell;
entry->state = NOTFOUND;
entry->cell = cCell;
} else {
uint32_t i = start;
uint32_t i = start;
for (; i < end; i++) {
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, i);
if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
entry->state = FOUND;
entry->addr = cell->addr;
entry->state = FOUND;
entry->addr = cell->addr;
fstRegistryCellPromote(registry->table, i, start);
break;
}
}
}
if (i >= end) {
uint64_t last = end - 1;
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, last);
//clone from bNode, refactor later
fstBuilderNodeCloneFrom(cell->node, bNode);
uint64_t last = end - 1;
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, last);
// clone from bNode, refactor later
fstBuilderNodeCloneFrom(cell->node, bNode);
fstRegistryCellPromote(registry->table, last, start);
FstRegistryCell *cCell = taosArrayGet(registry->table, start);
entry->state = NOTFOUND;
entry->cell = cCell;
entry->state = NOTFOUND;
entry->cell = cCell;
}
}
}
return entry;
}
void fstRegistryEntryDestroy(FstRegistryEntry *entry) {
free(entry);
}
void fstRegistryEntryDestroy(FstRegistryEntry *entry) { free(entry); }
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册