提交 0a810dd6 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -12,6 +12,7 @@ debug/
......@@ -544,7 +544,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
......@@ -44,10 +44,10 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
......@@ -40,7 +40,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
bool taosVariantIsValid(SVariant *pVar);
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type);
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type);
void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uint32_t type);
......@@ -131,6 +131,18 @@ struct SInsertStmtInfo;
bool qIsInsertSql(const char* pStr, size_t length);
typedef struct SParseContext {
const char* pSql; // sql string
size_t sqlLen; // length of the sql string
int64_t id; // operator id, generated by uuid generator
const char* pDbname;
const SEpSet* pEpSet;
int8_t schemaAttached; // denote if submit block is built with table schema or not
char* pMsg; // extended error message if exists to help avoid the problem in sql statement.
int32_t msgLen; // max length of the msg
} SParseContext;
* Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST.
* @param pSql sql string
......@@ -141,16 +153,35 @@ bool qIsInsertSql(const char* pStr, size_t length);
int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg, int32_t msgLen);
typedef enum {
} EPayloadType;
typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData;
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
* Parse the insert sql statement.
* @param pStr sql string
* @param length length of the sql string
* @param pInsertParam data in binary format to submit to vnode directly.
* @param id operator id, generated by uuid generator.
* @param msg extended error message if exists to help avoid the problem in sql statement.
* @return
* @return data in binary format to submit to vnode directly.
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen);
int32_t qParseInsertSql(SParseContext* pContext, struct SInsertStmtInfo** pInfo);
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
......@@ -38,34 +38,38 @@ typedef enum {
} EWalType;
typedef struct {
//union {
//uint32_t info;
//struct {
//uint32_t sver:3;
//uint32_t msgtype: 5;
//uint32_t reserved : 24;
typedef struct SWalReadHead {
int8_t sver;
uint8_t msgType;
int8_t reserved[2];
int32_t len;
int64_t version;
uint32_t signature;
uint32_t cksumHead;
uint32_t cksumBody;
char cont[];
} SWalHead;
} SWalReadHead;
typedef struct {
int32_t vgId;
int32_t fsyncPeriod; // millisecond
int32_t rollPeriod;
int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs
int64_t segSize;
EWalType walLevel; // wal level
} SWalCfg;
typedef struct {
//union {
//uint32_t info;
//struct {
//uint32_t sver:3;
//uint32_t msgtype: 5;
//uint32_t reserved : 24;
uint32_t cksumHead;
uint32_t cksumBody;
SWalReadHead head;
} SWalHead;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
......@@ -103,21 +107,17 @@ typedef struct SWal {
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//read tfd
int64_t readLogTfd;
int64_t readIdxTfd;
//current version
int64_t curVersion;
//wal lifecycle
int64_t firstVersion;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
//snapshotting version
int64_t snapshottingVer;
//roll status
int64_t lastRollSeq;
//file set
int32_t writeCur;
int32_t readCur;
SArray* fileInfoSet;
int32_t curStatus;
......@@ -148,7 +148,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
int32_t walTakeSnapshot(SWal *, int64_t ver);
int32_t walBeginTakeSnapshot(SWal *, int64_t ver);
int32_t walEndTakeSnapshot(SWal *);
//int32_t walDataCorrupted(SWal*);
// read
......@@ -38,11 +38,11 @@ extern "C" {
(dst)[(size)-1] = 0; \
} while (0)
int64_t taosStr2int64(char *str);
int64_t taosStr2int64(const char *str);
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
bool taosValidateEncodec(const char *encodec);
char * taosCharsetReplace(char *charsetstr);
......@@ -153,6 +153,13 @@ void taosArraySet(SArray* pArray, size_t index, void* pData);
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt);
* remove some data entry from front
* @param pArray
* @param cnt
void taosArrayPopTailBatch(SArray* pArray, size_t cnt);
* remove data entry of the given index
* @param pArray
......@@ -213,6 +220,14 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn);
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags);
* search the array, return index of the element
* @param pArray
* @param compar
* @param key
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags);
* search the array
* @param pArray
......@@ -39,7 +39,7 @@ static FORCE_INLINE int taosCalcChecksumAppend(TSCKSUM csi, uint8_t *stream, uin
static FORCE_INLINE int taosCheckChecksum(const uint8_t *stream, uint32_t ssize, TSCKSUM checksum) {
return (checksum == (*crc32c)(0, stream, (size_t)ssize));
return (checksum != (*crc32c)(0, stream, (size_t)ssize));
static FORCE_INLINE int taosCheckChecksumWhole(const uint8_t *stream, uint32_t ssize) {
......@@ -29,7 +29,7 @@ int32_t strdequote(char *src);
int32_t strndequote(char *dst, const char *z, int32_t len);
int32_t strRmquote(char *z, int32_t len);
size_t strtrim(char *src);
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote);
char **strsplit(char *src, const char *delim, int32_t *num);
char *strtolower(char *dst, const char *src);
char *strntolower(char *dst, const char *src, int32_t n);
......@@ -82,18 +82,18 @@ void deltaToUtcInitOnce() {
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(char *str, int32_t len);
static bool checkTzPresent(const char *str, int32_t len);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec, 'T');
......@@ -104,7 +104,7 @@ int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePre
bool checkTzPresent(char *str, int32_t len) {
bool checkTzPresent(const char *str, int32_t len) {
char *seg = forwardToTimeStringEnd(str);
int32_t seg_len = len - (int32_t)(seg - str);
......@@ -237,7 +237,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
* 2013-04-12T15:52:01+0800
* 2013-04-12T15:52:01.123+0800
int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim) {
int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) {
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
......@@ -432,7 +432,7 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time
* d - Days (24 hours)
* w - Weeks (7 days)
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
errno = 0;
char* endPtr = NULL;
......@@ -75,7 +75,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
return 0;
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type) {
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type) {
int32_t ret = 0;
memset(pVar, 0, sizeof(SVariant));
......@@ -26,10 +26,24 @@ extern "C" {
#include "index_fst_counting_writer.h"
#include "index_fst_automation.h"
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
typedef struct Fst Fst;
typedef struct FstNode FstNode;
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
typedef enum { Included, Excluded, Unbounded} FstBound;
typedef struct FstBoundWithData {
FstSlice data;
FstBound type;
} FstBoundWithData;
typedef struct FstStreamBuilder {
Fst *fst;
AutomationCtx *aut;
FstBoundWithData *min;
FstBoundWithData *max;
} FstStreamBuilder, FstStreamWithStateBuilder;
typedef struct FstRange {
uint64_t start;
......@@ -39,16 +53,9 @@ typedef struct FstRange {
typedef enum {GE, GT, LE, LT} RangeType;
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
typedef enum { Included, Excluded, Unbounded} FstBound;
typedef struct FstBoundWithData {
FstSlice data;
FstBound type;
} FstBoundWithData;
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data);
bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice);
bool fstBoundWithDataIsEmpty(FstBoundWithData *bound);
......@@ -60,8 +67,6 @@ typedef struct FstOutput {
Output out;
} FstOutput;
* UnFinished node and helper function
......@@ -275,6 +280,9 @@ FstNode* fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx);
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx);
bool fstVerify(Fst *fst);
......@@ -292,12 +300,12 @@ void streamStateDestroy(void *s);
typedef struct StreamWithState {
Fst *fst;
Automation *aut;
AutomationCtx *aut;
SArray *inp;
FstOutput emptyOutput;
SArray *stack; // <StreamState>
FstBoundWithData *endAt;
} StreamWithState ;
} StreamWithState;
typedef struct StreamWithStateResult {
FstSlice data;
......@@ -310,19 +318,13 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
void swsResultDestroy(StreamWithStateResult *result);
typedef void* (*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) ;
void streamWithStateDestroy(StreamWithState *sws);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
typedef struct FstStreamBuilder {
Fst *fst;
Automation *aut;
FstBoundWithData *min;
FstBoundWithData *max;
} FstStreamBuilder;
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut);
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut);
// set up bound range
// refator, simple code by marco
......@@ -19,33 +19,50 @@
extern "C" {
#include "index_fst_util.h"
typedef struct AutomationCtx AutomationCtx;
typedef enum AutomationType {
} AutomationType;
typedef struct StartWith {
AutomationCtx *autoSelf;
} StartWith;
typedef struct Complement {
AutomationCtx *autoSelf;
} Complement;
// automation
typedef struct AutomationCtx {
// automation interface
AutomationType type;
void *data;
} AutomationCtx;
typedef struct Automation {
void* (*start)() ;
bool (*isMatch)(void *);
bool (*canMatch)(void *data);
bool (*willAlwaysMatch)(void *state);
void* (*accept)(void *state, uint8_t byte);
void* (*acceptEof)(void *state);
void *data;
} Automation;
typedef enum StartWithStateKind { Done, Running } StartWithStateKind;
typedef struct StartWithStateValue {
StartWithStateKind kind;
void *value;
} StartWithStateValue;
typedef struct AutomationFunc {
void* (*start)(AutomationCtx *ctx) ;
bool (*isMatch)(AutomationCtx *ctx, void *);
bool (*canMatch)(AutomationCtx *ctx, void *data);
bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state);
void* (*accept)(AutomationCtx *ctx, void *state, uint8_t byte);
void* (*acceptEof)(AutomationCtx *ct, void *state);
} AutomationFunc;
AutomationCtx *automCtxCreate(void *data, AutomationType type);
void automCtxDestroy(AutomationCtx *ctx);
extern AutomationFunc automFuncs[];
#ifdef __cplusplus
......@@ -591,14 +591,14 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
uint8_t *data = fstSliceData(&t, &len);
int i = 0;
for(; i < len; i++) {
//uint8_t v = slice->data[slice->start + i];
////slice->data[slice->start + i];
uint8_t v = data[i];
if (v == b) {
return node->nTrans - i - 1; // bug
if (i == len) { *null = true; }
......@@ -634,7 +634,7 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) {
} else if (st.state == OneTrans) {
FstSlice data = fstSliceCopy(slice, 0, addr);
PackSizes sz = fstStateSizes(&st, &data);
n->data = fstSliceCopy(slice, 0, addr);
n->data = data;
n->version = version;
n->state = st;
n->start = addr;
......@@ -803,6 +803,7 @@ void fstBuilderDestroy(FstBuilder *b) {
......@@ -851,8 +852,9 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
FstSlice *input = &bs;
if (fstSliceIsEmpty(&b->last)) {
// deep copy or not
b->last = fstSliceCopy(&bs, input->start, input->end);
b->last = fstSliceDeepCopy(&bs, input->start, input->end);
} else {
int comp = fstSliceCompare(&b->last, &bs);
if (comp == 0 && ckDup) {
......@@ -862,20 +864,22 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
// deep copy or not
b->last = fstSliceCopy(&bs, input->start, input->end);
b->last = fstSliceDeepCopy(&bs, input->start, input->end);
return Ordered;
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate) {
CompiledAddr addr = NONE_ADDRESS;
while (istate + 1 < FST_UNFINISHED_NODES_LEN(b->unfinished)) {
FstBuilderNode *n = NULL;
FstBuilderNode *bn = NULL;
if (addr == NONE_ADDRESS) {
n = fstUnFinishedNodesPopEmpty(b->unfinished);
bn = fstUnFinishedNodesPopEmpty(b->unfinished);
} else {
n = fstUnFinishedNodesPopFreeze(b->unfinished, addr);
bn = fstUnFinishedNodesPopFreeze(b->unfinished, addr);
addr = fstBuilderCompile(b, n);
addr = fstBuilderCompile(b, bn);
assert(addr != NONE_ADDRESS);
......@@ -910,6 +914,7 @@ void* fstBuilderInsertInner(FstBuilder *b) {
fstBuilderCompileFrom(b, 0);
FstBuilderNode *rootNode = fstUnFinishedNodesPopRoot(b->unfinished);
CompiledAddr rootAddr = fstBuilderCompile(b, rootNode);
char buf64[8] = {0};
......@@ -1026,7 +1031,10 @@ Fst* fstCreate(FstSlice *slice) {
fst->meta->ty = type;
fst->meta->len = fstLen;
fst->meta->checkSum = checkSum;
fst->data = slice;
FstSlice *s = calloc(1, sizeof(FstSlice));
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice));
fst->data = s;
return fst;
......@@ -1038,7 +1046,8 @@ FST_CREAT_FAILED:
void fstDestroy(Fst *fst) {
if (fst) {
......@@ -1048,6 +1057,9 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
Output tOut = 0;
int32_t len;
uint8_t *data = fstSliceData(b, &len);
SArray *nodes = (SArray *)taosArrayInit(len, sizeof(FstNode *));
taosArrayPush(nodes, &root);
for (uint32_t i = 0; i < len; i++) {
uint8_t inp = data[i];
Output res = 0;
......@@ -1059,16 +1071,31 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
fstNodeGetTransitionAt(root, res, &trn);
tOut += trn.out;
root = fstGetNode(fst, trn.addr);
taosArrayPush(nodes, &root);
if (!FST_NODE_IS_FINAL(root)) {
return false;
} else {
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
for (size_t i = 0; i < taosArrayGetSize(nodes); i++) {
FstNode **node = (FstNode **)taosArrayGet(nodes, i);
fst->root = NULL;
*out = tOut;
return true;
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) {
return fstStreamBuilderCreate(fst, ctx);
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) {
return fstStreamBuilderCreate(fst, ctx);
FstNode *fstGetRoot(Fst *fst) {
if (fst->root != NULL) {
......@@ -1155,7 +1182,7 @@ void fstBoundDestroy(FstBoundWithData *bound) {
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState *sws = calloc(1, sizeof(StreamWithState));
if (sws == NULL) { return NULL; }
......@@ -1182,6 +1209,8 @@ void streamWithStateDestroy(StreamWithState *sws) {
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
AutomationCtx *aut = sws->aut;
if (fstBoundWithDataIsEmpty(min)) {
if (fstBoundWithDataIsIncluded(min)) {
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
......@@ -1189,7 +1218,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
StreamState s = {.node = fstGetRoot(sws->fst),
.trans = 0,
.out = {.null = false, .out = 0},
.autState = sws->aut->start()}; // auto.start callback
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
taosArrayPush(sws->stack, &s);
return true;
......@@ -1207,7 +1236,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
FstNode *node = fstGetRoot(sws->fst);
Output out = 0;
void* autState = sws->aut->start();
//void* autState = sws->aut->start();
void* autState = automFuncs[aut->type].start(aut);
int32_t len;
uint8_t *data = fstSliceData(key, &len);
......@@ -1219,7 +1249,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
FstTransition trn;
fstNodeGetTransitionAt(node, res, &trn);
void *preState = autState;
autState = sws->aut->accept(preState, b);
// autState = sws->aut->accept(preState, b);
autState = automFuncs[aut->type].accept(aut, preState, b);
taosArrayPush(sws->inp, &b);
StreamState s = {.node = node,
.trans = res + 1,
......@@ -1228,6 +1259,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
taosArrayPush(sws->stack, &s);
out += trn.out;
node = fstGetNode(sws->fst, trn.addr);
} else {
// This is a little tricky. We're in this case if the
......@@ -1275,6 +1307,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) {
AutomationCtx *aut = sws->aut;
FstOutput output = sws->emptyOutput;
if (output.null == false) {
FstSlice emptySlice = fstSliceCreate(NULL, 0);
......@@ -1283,15 +1316,15 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
return NULL;
void* start = sws->aut->start();
if (sws->aut->isMatch(start)) {
void *start = automFuncs[aut->type].start(aut);
if (automFuncs[aut->type].isMatch(aut, start)) {
FstSlice s = fstSliceCreate(NULL, 0);
return swsResultCreate(&s, output, callback(start));
while (taosArrayGetSize(sws->stack) > 0) {
StreamState *p = (StreamState *)taosArrayPop(sws->stack);
if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) {
if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) {
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
......@@ -1301,16 +1334,18 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
FstTransition trn;
fstNodeGetTransitionAt(p->node, p->trans, &trn);
Output out = p->out.out + trn.out;
void* nextState = sws->aut->accept(p->autState, trn.inp);
void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp);
void* tState = callback(nextState);
bool isMatch = sws->aut->isMatch(nextState);
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
//bool isMatch = sws->aut->isMatch(nextState);
FstNode *nextNode = fstGetNode(sws->fst, trn.addr);
taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) {
void *eofState = sws->aut->acceptEof(nextState);
//void *eofState = sws->aut->acceptEof(nextState);
void *eofState = automFuncs[aut->type].acceptEof(aut, nextState);
if (eofState != NULL) {
isMatch = sws->aut->isMatch(eofState);
isMatch = automFuncs[aut->type].isMatch(aut, eofState);
StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
......@@ -1368,7 +1403,7 @@ void streamStateDestroy(void *s) {
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut) {
FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder));
if (NULL == b) { return NULL; }
......@@ -1411,3 +1446,5 @@ FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, Rang
......@@ -13,3 +13,94 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "index_fst_automation.h"
// prefix query, impl later
static void* prefixStart(AutomationCtx *ctx) {
StartWithStateValue *data = (StartWithStateValue *)(ctx->data);
return data;
static bool prefixIsMatch(AutomationCtx *ctx, void *data) {
return true;
static bool prefixCanMatch(AutomationCtx *ctx, void *data) {
return true;
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
return NULL;
static void* prefixAcceptEof(AutomationCtx *ctx, void *state) {
return NULL;
// pattern query, impl later
static void* patternStart(AutomationCtx *ctx) {
return NULL;
static bool patternIsMatch(AutomationCtx *ctx, void *data) {
return true;
static bool patternCanMatch(AutomationCtx *ctx, void *data) {
return true;
static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
static void* patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
return NULL;
static void* patternAcceptEof(AutomationCtx *ctx, void *state) {
return NULL;
AutomationFunc automFuncs[] = {{
// add more search type
AutomationCtx* automCtxCreate(void *data, AutomationType type) {
AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { return NULL; }
if (type == AUTOMATION_PREFIX) {
StartWithStateValue *swsv = (StartWithStateValue *)calloc(1, sizeof(StartWithStateValue));
swsv->kind = Done;
swsv->value = NULL;
ctx->data = (void *)swsv;
} else if (type == AUTMMATION_MATCH) {
} else {
// add more search type
ctx->type = type;
return ctx;
void automCtxDestroy(AutomationCtx *ctx) {
if (ctx->type == AUTOMATION_PREFIX) {
} else if (ctx->type == AUTMMATION_MATCH) {
......@@ -65,6 +65,7 @@ class FstReadMemory {
~FstReadMemory() {
......@@ -129,10 +130,12 @@ class FstReadMemory {
#define L 100
#define M 100
#define N 100
int Performance_fstWriteRecords(FstWriter *b) {
std::string str("aa");
int L = 100, M = 100, N = 10;
for (int i = 0; i < L; i++) {
str[0] = 'a' + i;
......@@ -150,22 +153,29 @@ int Performance_fstWriteRecords(FstWriter *b) {
void Performance_fstReadRecords(FstReadMemory *m) {
std::string str("a");
for (int i = 0; i < 50; i++) {
//std::string str("aa");
std::string str("aa");
for (int i = 0; i < M; i++) {
str[0] = 'a' + i;
for(int j = 0; j < N; j++) {
str[1] = 'a' + j;
for (int k = 0; k < L; k++) {
uint64_t out, cost;
bool ok = m->GetWithTimeCostUs(str, &out, &cost);
if (ok == true) {
printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost);
uint64_t val, cost;
if (m->GetWithTimeCostUs(str, &val, &cost)) {
printf("succes to get kv(%s, %" PRId64"), cost: %" PRId64"\n", str.c_str(), val, cost);
} else {
printf("failed to get(%s)\n", str.c_str());
printf("failed to get key: %s\n", str.c_str());
void checkFstPerf() {
FstWriter *fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int num = Performance_fstWriteRecords(fw);
int64_t e = taosGetTimestampUs();
printf("write %d record cost %" PRId64"us\n", num, e - s);
......@@ -173,13 +183,11 @@ void checkFstPerf() {
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init()) {
uint64_t val;
if(m->Get("aaaaaaa", &val)) {
std::cout << "succes to Get val: " << val << std::endl;
} else {
std::cout << "failed to Get " << std::endl;
printf("success to init fst read");
delete m;
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "catalog.h"
#include "os.h"
#include "ttypes.h"
#include "tname.h"
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
typedef enum EOrderStatus {
} EOrderStatus;
typedef enum EValStat {
VAL_STAT_HAS = 0x0, // 0 means has val
VAL_STAT_NONE = 0x01, // 1 means no val
} EValStat;
typedef enum ERowCompareStat {
} ERowCompareStat;
typedef struct SBoundColumn {
int32_t offset; // all column offset value
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
} SBoundColumn;
typedef struct {
uint16_t schemaColIdx;
uint16_t boundIdx;
uint16_t finalIdx;
} SBoundIdxInfo;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema
uint16_t extendedVarLen;
int32_t *boundedColumns; // bound column idx according to schema
SBoundColumn *cols;
SBoundIdxInfo *colIdxInfo;
int8_t orderStatus; // bound columns
} SParsedDataColInfo;
typedef struct SMemRowInfo {
int32_t dataLen; // len of SDataRow
int32_t kvLen; // len of SKVRow
} SMemRowInfo;
typedef struct {
uint8_t memRowType; // default is 0, that is SDataRow
uint8_t compareStat; // 0 no need, 1 need compare
TDRowTLenT kvRowInitLen;
SMemRowInfo *rowInfo;
} SMemRowBuilder;
typedef struct SParamInfo {
int32_t idx;
uint8_t type;
uint8_t timePrec;
int16_t bytes;
uint32_t offset;
} SParamInfo;
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
bool cloned;
STagData tagData;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo * params;
SMemRowBuilder rowBuilder;
} STableDataBlocks;
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
memRowSetType(row, memRowType);
if (isDataRowT(memRowType)) {
dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion);
dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen));
} else {
ASSERT(nBoundCols > 0);
memRowSetKvVersion(row, pBlock->pTableMeta->sversion);
kvRowSetNCols(memRowKvBody(row), nBoundCols);
kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize);
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
// Applicable to consume by one row
static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
uint8_t compareStat) {
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
if (compareStat == ROW_COMPARE_NEED) {
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd,
int32_t idx, int32_t *toffset) {
int32_t schemaIdx = 0;
schemaIdx = spd->boundedColumns[idx];
if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
} else {
*toffset = idx * sizeof(SColIdx); // the offset of SColIdx
} else {
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset;
} else {
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (kvLen < (dataLen * KVRatioConvert)) {
} else if (kvLen > dataLen) {
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->suid;
pBlocks->uid = pTableMeta->uid;
pBlocks->sversion = pTableMeta->sversion;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
} else {
pBlocks->numOfRows += numOfRows;
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap);
......@@ -16,4 +16,16 @@
#ifdef __cplusplus
extern "C" {
#include "parser.h"
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
#ifdef __cplusplus
......@@ -26,14 +26,6 @@ extern "C" {
struct SSqlNode;
typedef struct SInsertStmtInfo {
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
char *sql; // current sql statement position
} SInsertStmtInfo;
typedef struct SInternalField {
......@@ -46,7 +46,7 @@ SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index);
int32_t parserValidateIdToken(SToken* pToken);
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr);
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
......@@ -61,6 +61,8 @@ void cleanupColumnCond(SArray** pCond);
uint32_t convertRelationalOperator(SToken *pToken);
int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
#ifdef __cplusplus
......@@ -44,7 +44,7 @@ typedef struct SToken {
* @param tokenType
* @return
uint32_t tGetToken(char *z, uint32_t *tokenType);
uint32_t tGetToken(const char *z, uint32_t *tokenType);
* enhanced tokenizer for sql string.
......@@ -54,7 +54,7 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
* @param isPrevOptr
* @return
SToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr);
* check if it is a keyword or not
......@@ -191,7 +191,7 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) {
pExpr->type = SQL_NODE_EXPR;
if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) {
char* endPos = pRight->exprToken.z + pRight->exprToken.n;
const char* endPos = pRight->exprToken.z + pRight->exprToken.n;
pExpr->exprToken.z = pLeft->exprToken.z;
pExpr->exprToken.n = (uint32_t)(endPos - pExpr->exprToken.z);
pExpr->exprToken.type = pLeft->exprToken.type;
......@@ -18,6 +18,7 @@
#include "parserUtil.h"
#include "ttoken.h"
#include "function.h"
#include "insertParser.h"
bool qIsInsertSql(const char* pStr, size_t length) {
int32_t index = 0;
......@@ -46,8 +47,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen) {
return 0;
int32_t qParseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
return parseInsertSql(pContext, pInfo);
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
......@@ -173,7 +174,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
assert(t != NULL);
if (t->n >= TSDB_FUNC_NAME_LEN) {
return buildSyntaxErrMsg(msg, msgBufLen, "too long function name", t->z);
return buildSyntaxErrMsg(&msgBuf, "too long function name", t->z);
// Let's assume that it is an UDF/UDAF, if it is not a built-in function.
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "parserUtil.h"
#include "taosmsg.h"
#include "parser.h"
#include "taoserror.h"
......@@ -18,7 +34,6 @@ typedef struct STableFilterCond {
static STableMetaInfo* addTableMetaInfo(SQueryStmtInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMeta* tableMetaDup(STableMeta* pTableMeta);
int32_t parserValidateIdToken(SToken* pToken) {
if (pToken == NULL || pToken->z == NULL || pToken->type != TK_ID) {
......@@ -87,7 +102,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) {
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr) {
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
const char* msgFormat1 = "syntax error near \'%s\'";
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
const char* msgFormat3 = "%s";
......@@ -95,7 +110,7 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
const char* prefix = "syntax error";
if (sourceStr == NULL) {
assert(additionalInfo != NULL);
snprintf(dst, dstBufLen, msgFormat1, additionalInfo);
snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo);
......@@ -103,10 +118,10 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
strncpy(buf, sourceStr, tListLen(buf) - 1);
if (additionalInfo != NULL) {
snprintf(dst, dstBufLen, msgFormat2, buf, additionalInfo);
snprintf(pBuf->buf, pBuf->len, msgFormat2, buf, additionalInfo);
} else {
const char* msgFormat = (0 == strncmp(sourceStr, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1;
snprintf(dst, dstBufLen, msgFormat, buf);
snprintf(pBuf->buf, pBuf->len, msgFormat, buf);
......@@ -1490,7 +1505,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
return pTableMeta;
uint32_t getTableMetaSize(STableMeta* pTableMeta) {
uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
int32_t totalCols = 0;
......@@ -1505,7 +1520,7 @@ uint32_t getTableMetaMaxSize() {
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
STableMeta* tableMetaDup(STableMeta* pTableMeta) {
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
size_t size = getTableMetaSize(pTableMeta);
......@@ -284,7 +284,7 @@ static int32_t tKeywordCode(const char* z, int n) {
* Return the length of the token that begins at z[0].
* Store the token type in *type before returning.
uint32_t tGetToken(char* z, uint32_t* tokenId) {
uint32_t tGetToken(const char* z, uint32_t* tokenId) {
uint32_t i;
switch (*z) {
case ' ':
......@@ -595,7 +595,7 @@ SToken tscReplaceStrToken(char **str, SToken *token, const char* newToken) {
return ntoken;
SToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
SToken t0 = {0};
// here we reach the end of sql string, null-terminated string
......@@ -689,13 +689,12 @@ void taosCleanupKeywordsTable() {
SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) {
assert(pToken != NULL && buf != NULL);
SToken token = *pToken;
token.z = buf;
assert(pToken != NULL && buf != NULL && len > pToken->n);
assert(len > token.n);
strncpy(token.z, pToken->z, pToken->n);
token.z[token.n] = 0;
strncpy(buf, pToken->z, pToken->n);
buf[pToken->n] = 0;
SToken token = *pToken;
token.z = buf;
return token;
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <gtest/gtest.h>
#include "insertParser.h"
#include "mockCatalog.h"
using namespace std;
using namespace testing;
namespace {
string toString(int32_t code) {
return tstrerror(code);
// syntax:
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
class InsertTest : public Test {
void bind(const char* sql) {
cxt.pSql = sql;
cxt.sqlLen = strlen(sql);
int32_t run() {
code = parseInsertSql(&cxt, &res);
if (code != TSDB_CODE_SUCCESS) {
cout << "code:" << toString(code) << ", msg:" << errMagBuf << endl;
return code;
SInsertStmtInfo* reslut() {
return res;
static const int max_err_len = 1024;
void reset() {
memset(&cxt, 0, sizeof(cxt));
memset(errMagBuf, 0, max_err_len);
cxt.pMsg = errMagBuf;
cxt.msgLen = max_err_len;
res = nullptr;
char errMagBuf[max_err_len];
SParseContext cxt;
int32_t code;
SInsertStmtInfo* res;
// INSERT INTO tb_name VALUES (field1_value, ...)
TEST_F(InsertTest, simpleTest) {
bind("insert into .. values (...)");
SInsertStmtInfo* res = reslut();
// todo check
TEST_F(InsertTest, toleranceTest) {
bind("insert into");
bind("insert into t");
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "mockCatalog.h"
#include <iostream>
void generateMetaData(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, MockCatalogService::numOfDataTypes)
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP);
for (int32_t i = 0; i < MockCatalogService::numOfDataTypes; ++i) {
if (TSDB_DATA_TYPE_NULL == tDataTypes[i].type) {
builder = builder.addColumn("c" + std::to_string(i + 1), tDataTypes[i].type);
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, MockCatalogService::numOfDataTypes, 2)
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(2).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP);
for (int32_t i = 0; i < MockCatalogService::numOfDataTypes; ++i) {
if (TSDB_DATA_TYPE_NULL == tDataTypes[i].type) {
builder = builder.addColumn("c" + std::to_string(i + 1), tDataTypes[i].type);
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) {
return mockCatalogService->getCatalogHandle(pMgmtEps);
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
return mockCatalogService->catalogGetMetaData(pCatalog, pMetaReq, pMetaData);
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "mockCatalogService.h"
void generateMetaData(MockCatalogService* mcs);
// mock
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
#endif // MOCK_CATALOG_H
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "mockCatalogService.h"
#include <iomanip>
#include <iostream>
#include <map>
#include "ttypes.h"
std::unique_ptr<MockCatalogService> mockCatalogService;
class TableBuilder : public ITableBuilder {
virtual TableBuilder& addColumn(const std::string& name, int8_t type, int32_t bytes) {
assert(colIndex_ < meta_->tableInfo.numOfTags + meta_->tableInfo.numOfColumns);
SSchema* col = meta_->schema + colIndex_;
col->type = type;
col->colId = colIndex_++;
col->bytes = bytes;
strcpy(col->name, name.c_str());
return *this;
virtual TableBuilder& setVgid(int16_t vgid) {
meta_->vgId = vgid;
return *this;
virtual TableBuilder& setPrecision(uint8_t precision) {
meta_->tableInfo.precision = precision;
return *this;
virtual void done() {
meta_->tableInfo.rowSize = rowsize_;
friend class MockCatalogServiceImpl;
static std::unique_ptr<TableBuilder> createTableBuilder(int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
STableMeta* meta = (STableMeta*)std::calloc(1, sizeof(STableMeta) + sizeof(SSchema) * (numOfColumns + numOfTags));
if (nullptr == meta) {
throw std::bad_alloc();
meta->tableType = tableType;
meta->tableInfo.numOfTags = numOfTags;
meta->tableInfo.numOfColumns = numOfColumns;
return std::unique_ptr<TableBuilder>(new TableBuilder(meta));
TableBuilder(STableMeta* meta) : colIndex_(0), rowsize_(0), meta_(meta) {
STableMeta* table() {
return meta_;
int32_t colIndex_;
int32_t rowsize_;
STableMeta* meta_;
class MockCatalogServiceImpl {
static const int32_t numOfDataTypes = sizeof(tDataTypes) / sizeof(tDataTypes[0]);
MockCatalogServiceImpl() {
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) {
return (struct SCatalog*)0x01;
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
return 0;
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags);
meta_[db][tbname]->uid = id_++;
return *(builder_.get());
void showTables() const {
// number of forward fills
#define NOF(n) ((n) / 2)
// number of backward fills
#define NOB(n) ((n) % 2 ? (n) / 2 + 1 : (n) / 2)
// center aligned
#define CA(n, s) std::setw(NOF((n) - (s).length())) << "" << (s) << std::setw(NOB((n) - (s).length())) << "" << "|"
// string field length
#define SFL 20
// string field header
#define SH(h) CA(SFL, std::string(h))
// string field
#define SF(n) CA(SFL, n)
// integer field length
#define IFL 10
// integer field header
#define IH(i) CA(IFL, std::string(i))
// integer field
#define IF(i) CA(IFL, std::to_string(i))
// split line
#define SL(sn, in) std::setfill('=') << std::setw((sn) * (SFL + 1) + (in) * (IFL + 1)) << "" << std::setfill(' ')
for (const auto& db : meta_) {
std::cout << SH("Database") << SH("Table") << SH("Type") << SH("Precision") << IH(std::string("Vgid")) << std::endl;
std::cout << SL(4, 1) << std::endl;
for (const auto& table : db.second) {
std::cout << SF(db.first) << SF(table.first) << SF(ttToString(table.second->tableType)) << SF(pToString(table.second->tableInfo.precision)) << IF(table.second->vgId) << std::endl;
// int16_t numOfFields = table.second->tableInfo.numOfTags + table.second->tableInfo.numOfColumns;
// for (int16_t i = 0; i < numOfFields; ++i) {
// const SSchema* schema = table.second->schema + i;
// std::cout << schema->name << " " << schema->type << " " << schema->bytes << std::endl;
// }
std::string ttToString(int8_t tableType) const {
switch (tableType) {
return "super table";
return "child table";
return "normal table";
return "unknown";
std::string pToString(uint8_t precision) const {
switch (precision) {
return "millisecond";
return "microsecond";
return "nanosecond";
return "unknown";
uint64_t id_;
std::unique_ptr<TableBuilder> builder_;
std::map<std::string, std::map<std::string, std::shared_ptr<STableMeta> > > meta_;
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {
MockCatalogService::~MockCatalogService() {
struct SCatalog* MockCatalogService::getCatalogHandle(const SEpSet* pMgmtEps) {
return impl_->getCatalogHandle(pMgmtEps);
int32_t MockCatalogService::catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
return impl_->catalogGetMetaData(pCatalog, pMetaReq, pMetaData);
ITableBuilder& MockCatalogService::createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
return impl_->createTableBuilder(db, tbname, tableType, numOfColumns, numOfTags);
void MockCatalogService::showTables() const {
\ No newline at end of file
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <memory>
#include <string>
#include "catalog.h"
class ITableBuilder {
ITableBuilder& addTag(const std::string& name, int8_t type) {
return addColumn(name, type, tDataTypes[type].bytes);
ITableBuilder& addTag(const std::string& name, int8_t type, int32_t bytes) {
return addColumn(name, type, bytes);
ITableBuilder& addColumn(const std::string& name, int8_t type) {
return addColumn(name, type, tDataTypes[type].bytes);
virtual ITableBuilder& addColumn(const std::string& name, int8_t type, int32_t bytes) = 0;
virtual ITableBuilder& setVgid(int16_t vgid) = 0;
virtual ITableBuilder& setPrecision(uint8_t precision) = 0;
virtual void done() = 0;
class MockCatalogServiceImpl;
class MockCatalogService {
static const int32_t numOfDataTypes = sizeof(tDataTypes) / sizeof(tDataTypes[0]);
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
ITableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags = 0);
void showTables() const;
std::unique_ptr<MockCatalogServiceImpl> impl_;
extern std::unique_ptr<MockCatalogService> mockCatalogService;
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <string>
#include <gtest/gtest.h>
#include "mockCatalog.h"
class ParserEnv : public testing::Environment {
virtual void SetUp() {
mockCatalogService.reset(new MockCatalogService());
virtual void TearDown() {
ParserEnv() {}
virtual ~ParserEnv() {}
int main(int argc, char* argv[]) {
testing::AddGlobalTestEnvironment(new ParserEnv());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
......@@ -79,11 +79,6 @@ static void _init_tvariant_nchar(SVariant* t) {
t->nLen = twcslen(t->wpz);
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
TEST(testCase, validateToken_test) {
char t01[] = "abc";
EXPECT_EQ(testValidateName(t01), TSDB_CODE_SUCCESS);
......@@ -18,6 +18,7 @@
#include "wal.h"
#include "compare.h"
#include "tchecksum.h"
#ifdef __cplusplus
extern "C" {
......@@ -32,6 +33,11 @@ typedef struct WalFileInfo {
int64_t fileSize;
} WalFileInfo;
typedef struct WalIdxEntry {
int64_t ver;
int64_t offset;
} WalIdxEntry;
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
WalFileInfo* pInfoRight = (WalFileInfo*)pRight;
......@@ -79,6 +85,26 @@ static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
static inline int walValidHeadCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead);
static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody);
static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
static inline uint32_t walCalcHeadCksum(SWalHead *pHead) {
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead));
static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
return taosCalcChecksum(0, (uint8_t*)body, len);
int walReadMeta(SWal* pWal);
int walWriteMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal);
......@@ -87,6 +113,10 @@ char* walMetaSerialize(SWal* pWal);
int walMetaDeserialize(SWal* pWal, const char* bytes);
//meta section end
//seek section
int walChangeFile(SWal *pWal, int64_t ver);
//seek section end
int64_t walGetSeq();
int walSeekVer(SWal *pWal, int64_t ver);
int walRoll(SWal *pWal);
......@@ -24,6 +24,18 @@
#include <libgen.h>
#include <regex.h>
int64_t walGetFirstVer(SWal *pWal) {
return pWal->firstVersion;
int64_t walGetSnaphostVer(SWal *pWal) {
return pWal->snapshotVersion;
int64_t walGetLastVer(SWal *pWal) {
return pWal->lastVersion;
int walRollFileInfo(SWal* pWal) {
int64_t ts = taosGetTimestampSec();
......@@ -82,6 +82,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1;
pWal->writeCur = -1;
//set config
pWal->vgId = pCfg->vgId;
......@@ -90,13 +91,20 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->segSize = pCfg->segSize;
pWal->level = pCfg->walLevel;
//init status
//init version info
pWal->firstVersion = -1;
pWal->commitVersion = -1;
pWal->snapshotVersion = -1;
pWal->lastVersion = -1;
pWal->snapshottingVer = -1;
//init status
pWal->lastRollSeq = -1;
//init write buffer
memset(&pWal->head, 0, sizeof(SWalHead));
pWal->head.sver = 0;
pWal->head.head.sver = 0;
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
......@@ -15,19 +15,6 @@
#include "walInt.h"
#include "tfile.h"
#include "tchecksum.h"
static inline int walValidHeadCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead);
static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody);
static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
......@@ -49,13 +36,13 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
if(walValidHeadCksum(*ppHead) != 0) {
return -1;
void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len);
void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if(ptr == NULL) {
*ppHead = NULL;
return -1;
if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
//TODO: endian compatibility processing after read
......@@ -69,18 +56,3 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
int64_t walGetFirstVer(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->firstVersion;
int64_t walGetSnaphostVer(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->snapshotVersion;
int64_t walGetLastVer(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->lastVersion;
......@@ -43,12 +43,35 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
if (code != 0) {
return -1;
/*pWal->curLogOffset = readBuf[1];*/
pWal->curVersion = ver;
return code;
static int walChangeFile(SWal *pWal, int64_t ver) {
int walChangeFileToLast(SWal *pWal) {
int64_t idxTfd, logTfd;
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
int64_t fileFirstVer = pRet->firstVer;
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
if(idxTfd < 0) {
return -1;
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr);
if(logTfd < 0) {
return -1;
//switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
//change status
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
return 0;
int walChangeFile(SWal *pWal, int64_t ver) {
int code = 0;
int64_t idxTfd, logTfd;
char fnameStr[WAL_FILE_LEN];
......@@ -86,21 +109,21 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
return code;
int walGetVerOffset(SWal* pWal, int64_t ver) {
int code;
return 0;
int walSeekVer(SWal *pWal, int64_t ver) {
int code;
if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) {
if(ver == pWal->lastVersion) {
return 0;
if(ver > pWal->lastVersion) {
//TODO: some records are skipped
return -1;
if(ver < pWal->firstVersion) {
//TODO: try to seek pruned log
if(ver > pWal->lastVersion || ver < pWal->firstVersion) {
return -1;
if(ver < pWal->snapshotVersion) {
//TODO: seek snapshotted log, invalid in some cases
//TODO: set flag to prevent roll back
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver);
......@@ -21,65 +21,6 @@
#include "tfile.h"
#include "walInt.h"
static void walFtruncate(SWal *pWal, int64_t ver);
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->snapshotVersion <= pWal->commitVersion);
ASSERT(pWal->commitVersion <= pWal->lastVersion);
ASSERT(ver >= pWal->commitVersion);
ASSERT(ver <= pWal->lastVersion);
pWal->commitVersion = ver;
return 0;
int32_t walRollback(SWal *pWal, int64_t ver) {
//TODO: ftruncate
ASSERT(ver > pWal->commitVersion);
ASSERT(ver <= pWal->lastVersion);
//seek position
walSeekVer(pWal, ver);
walFtruncate(pWal, ver);
return 0;
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
pWal->snapshotVersion = ver;
int ts = taosGetTimestampSec();
int deleteCnt = 0;
int64_t newTotSize = pWal->totSize;
WalFileInfo tmp;
tmp.firstVer = ver;
//mark files safe to delete
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
//iterate files, until the searched result
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) {
//delete according to file size or close time
newTotSize -= iter->fileSize;
char fnameStr[WAL_FILE_LEN];
//remove file
for(int i = 0; i < deleteCnt; i++) {
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
//save snapshot ver, commit ver
//make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
pWal->totSize = newTotSize;
return 0;
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
......@@ -172,6 +113,185 @@ void walRemoveAllOldFiles(void *handle) {
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->commitVersion >= pWal->snapshotVersion);
ASSERT(pWal->commitVersion <= pWal->lastVersion);
if(ver < pWal->commitVersion || ver > pWal->lastVersion) {
return -1;
pWal->commitVersion = ver;
return 0;
int32_t walRollback(SWal *pWal, int64_t ver) {
int code;
char fnameStr[WAL_FILE_LEN];
if(ver == pWal->lastVersion) {
return 0;
if(ver > pWal->lastVersion || ver < pWal->commitVersion) {
return -1;
//find correct file
if(ver < walGetLastFileFirstVer(pWal)) {
//close current files
//open old files
code = walChangeFile(pWal, ver);
if(code != 0) {
return -1;
//delete files
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for(int i = pWal->writeCur; i < fileSetSize; i++) {
walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
//pop from fileInfoSet
taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
int64_t idxTfd = tfOpenReadWrite(fnameStr);
//change to deserialize function
if(idxTfd < 0) {
return -1;
int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, idxOff, SEEK_SET);
if(code < 0) {
return -1;
//read idx file and get log file pos
//TODO:change to deserialize function
WalIdxEntry entry;
if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
return -1;
ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
int64_t logTfd = tfOpenReadWrite(fnameStr);
if(logTfd < 0) {
return -1;
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if(code < 0) {
return -1;
//validate offset
SWalHead head;
int size = tfRead(logTfd, &head, sizeof(SWalHead));
if(size != sizeof(SWalHead)) {
return -1;
code = walValidHeadCksum(&head);
ASSERT(code == 0);
if(code != 0) {
return -1;
if(head.head.version != ver) {
return -1;
//truncate old files
code = tfFtruncate(logTfd, entry.offset);
if(code < 0) {
return -1;
code = tfFtruncate(idxTfd, idxOff);
if(code < 0) {
return -1;
pWal->lastVersion = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
return 0;
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
pWal->snapshottingVer = ver;
//check file rolling
if(pWal->retentionPeriod == 0) {
return 0;
int32_t walEndTakeSnapshot(SWal *pWal) {
int64_t ver = pWal->snapshottingVer;
if(ver == -1) return -1;
pWal->snapshotVersion = ver;
int ts = taosGetTimestampSec();
int deleteCnt = 0;
int64_t newTotSize = pWal->totSize;
WalFileInfo tmp;
tmp.firstVer = ver;
//find files safe to delete
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if(ver >= pInfo->lastVer) {
//iterate files, until the searched result
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) {
//delete according to file size or close time
newTotSize -= iter->fileSize;
char fnameStr[WAL_FILE_LEN];
//remove file
for(int i = 0; i < deleteCnt; i++) {
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
//make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->firstVersion = -1;
} else {
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
pWal->totSize = newTotSize;
pWal->snapshottingVer = -1;
//save snapshot ver, commit ver
int code = walWriteMeta(pWal);
if(code != 0) {
return -1;
return 0;
int walRoll(SWal *pWal) {
int code = 0;
if(pWal->writeIdxTfd != -1) {
......@@ -211,6 +331,7 @@ int walRoll(SWal *pWal) {
//switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
//change status
......@@ -218,32 +339,6 @@ int walRoll(SWal *pWal) {
return 0;
int walChangeFileToLast(SWal *pWal) {
int64_t idxTfd, logTfd;
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
int64_t fileFirstVer = pRet->firstVer;
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
if(idxTfd < 0) {
return -1;
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr);
if(logTfd < 0) {
return -1;
//switch file
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
//change status
pWal->curVersion = fileFirstVer;
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
return 0;
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
......@@ -253,9 +348,11 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
return code;
int64_t writeBuf[2] = { ver, offset };
int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf));
if(size != sizeof(writeBuf)) {
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
WalIdxEntry entry = { .ver = ver, .offset = offset };
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
if(size != sizeof(WalIdxEntry)) {
return -1;
return 0;
......@@ -270,13 +367,14 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
if (index == pWal->lastVersion + 1) {
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->firstVersion = index;
code = walRoll(pWal);
ASSERT(code == 0);
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) {
if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) {
} else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) {
} else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) {
......@@ -287,16 +385,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
pWal->head.version = index;
pWal->head.signature = WAL_SIGNATURE;
pWal->head.len = bodyLen;
pWal->head.msgType = msgType;
pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2);
pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen);
pWal->head.head.version = index;
pWal->head.head.len = bodyLen;
pWal->head.head.msgType = msgType;
pWal->head.cksumHead = walCalcHeadCksum(&pWal->head);
pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen);
if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
......@@ -312,6 +407,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
if(code != 0) {
return -1;
//set status
......@@ -326,8 +422,6 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->writeLogTfd) < 0) {
......@@ -408,7 +502,7 @@ static int walValidateOffset(SWal* pWal, int64_t ver) {
int code = 0;
SWalHead *pHead = NULL;
code = (int)walRead(pWal, &pHead, ver);
if(pHead->version != ver) {
if(pHead->head.version != ver) {
return -1;
return 0;
......@@ -428,15 +522,6 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) {
return 0;
static void walFtruncate(SWal *pWal, int64_t ver) {
int64_t tfd = pWal->writeLogTfd;
tfFtruncate(tfd, ver);
tfd = pWal->writeIdxTfd;
tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
#if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
......@@ -36,6 +36,36 @@ class WalCleanEnv : public ::testing::Test {
const char* pathName = "/tmp/wal_test";
class WalCleanDeleteEnv : public ::testing::Test {
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
static void TearDownTestCase() {
void SetUp() override {
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
void TearDown() override {
pWal = NULL;
SWal* pWal = NULL;
const char* pathName = "/tmp/wal_test";
class WalKeepEnv : public ::testing::Test {
static void SetUpTestCase() {
......@@ -110,40 +140,94 @@ TEST_F(WalCleanEnv, removeOldMeta) {
ASSERT(code == 0);
TEST_F(WalKeepEnv, readOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walWriteMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walWriteMeta(pWal);
ASSERT(code == 0);
char*oldss = walMetaSerialize(pWal);
code = walReadMeta(pWal);
ASSERT(code == 0);
char* newss = walMetaSerialize(pWal);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
//TEST_F(WalKeepEnv, readOldMeta) {
//int code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//char*oldss = walMetaSerialize(pWal);
//code = walReadMeta(pWal);
//ASSERT(code == 0);
//char* newss = walMetaSerialize(pWal);
//int len = strlen(oldss);
//ASSERT_EQ(len, strlen(newss));
//for(int i = 0; i < len; i++) {
//EXPECT_EQ(oldss[i], newss[i]);
TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i);
code = walWriteMeta(pWal);
ASSERT_EQ(code, 0);
TEST_F(WalKeepEnv, write) {
TEST_F(WalCleanEnv, rollback) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i);
code = walRollback(pWal, 5);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 4);
code = walRollback(pWal, 3);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 2);
code = walWriteMeta(pWal);
ASSERT_EQ(code, 0);
TEST_F(WalCleanDeleteEnv, roll) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
int i;
for(i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i);
walBeginTakeSnapshot(pWal, i-1);
ASSERT_EQ(pWal->snapshottingVer, i-1);
ASSERT_EQ(pWal->snapshotVersion, i-1);
ASSERT_EQ(pWal->snapshottingVer, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, len);
ASSERT_NE(code, 0);
for(; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
ASSERT_EQ(code, 0);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i);
code = walWriteMeta(pWal);
ASSERT_EQ(code, 0);
......@@ -37,7 +37,7 @@ char *taosCharsetReplace(char *charsetstr) {
return strdup(charsetstr);
int64_t taosStr2int64(char *str) {
int64_t taosStr2int64(const char *str) {
char *endptr = NULL;
return strtoll(str, &endptr, 10);
......@@ -107,7 +107,7 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) {
return len;
bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
memset(ucs4, 0, ucs4_max_len);
mbstate_t state = {0};
int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state);
......@@ -241,12 +241,16 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
if(pArray->size == 0) {
pArray->size = 0;
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size);
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
void taosArrayRemove(SArray* pArray, size_t index) {
assert(index < pArray->size);
......@@ -329,6 +333,11 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
void* item = taosArraySearch(pArray, key, comparFn, flags);
return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
assert(pArray != NULL);
qsort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
......@@ -166,7 +166,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
return split;
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote) {
for (int32_t i = 0; i < len; ++i) {
// skip the needle in quote, jump to the end of quoted string
......@@ -179,7 +179,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
if (haystack[i] == needle) {
return &haystack[i];
return (char *)&haystack[i];
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册