diff --git a/.gitignore b/.gitignore index ab4fbda71f5828126c75b363120b5df0a93a9852..2308ea78969849379d019f18b238ce496dbdc29d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ debug/ release/ target/ debs/ +deps/ rpms/ mac/ *.pyc diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index a3de2452acff4148ff4a81223fcdd7e64797893d..c6ef6c513fd7cdec335ea8c0c9d8b51edaee85f6 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -544,7 +544,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder); void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); -static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { +static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) { if (pBuilder->nCols >= pBuilder->tCols) { pBuilder->tCols *= 2; SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); diff --git a/include/common/ttime.h b/include/common/ttime.h index 926d23c23994f7c726c2ac53d4aa230fa84baaeb..ed6496aca670bb04f001e9a6f610cd1951e0dec5 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -44,10 +44,10 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); -int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); +int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); -int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); +int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); diff --git a/include/common/tvariant.h b/include/common/tvariant.h index 552c7eaa3ffc7a01c2f3415653b12f39111879d5..e068c6eb6ca4cc56b3eae27d5ca213517ad86752 100644 --- a/include/common/tvariant.h +++ b/include/common/tvariant.h @@ -40,7 +40,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool* bool taosVariantIsValid(SVariant *pVar); -void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type); +void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type); void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uint32_t type); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 0e6c352d71e736dc08fca4e1d5e344180a99fda8..e2817baeae93e3b7210188c7ad007ed88013d7dd 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -131,6 +131,18 @@ struct SInsertStmtInfo; */ bool qIsInsertSql(const char* pStr, size_t length); +typedef struct SParseContext { + const char* pSql; // sql string + size_t sqlLen; // length of the sql string + int64_t id; // operator id, generated by uuid generator + const char* pDbname; + const SEpSet* pEpSet; + int8_t schemaAttached; // denote if submit block is built with table schema or not + + char* pMsg; // extended error message if exists to help avoid the problem in sql statement. + int32_t msgLen; // max length of the msg +} SParseContext; + /** * Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST. * @param pSql sql string @@ -141,16 +153,35 @@ bool qIsInsertSql(const char* pStr, size_t length); */ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg, int32_t msgLen); +typedef enum { + PAYLOAD_TYPE_KV = 0, + PAYLOAD_TYPE_RAW = 1, +} EPayloadType; + +typedef struct SVgDataBlocks { + int64_t vgId; // virtual group id + int32_t numOfTables; // number of tables in current submit block + uint32_t size; + char *pData; +} SVgDataBlocks; + +typedef struct SInsertStmtInfo { + SArray* pDataBlocks; // data block for each vgroup, SArray. + int8_t schemaAttache; // denote if submit block is built with table schema or not + uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert + uint32_t insertType; // insert data from [file|sql statement| bound statement] + const char* sql; // current sql statement position +} SInsertStmtInfo; + /** * Parse the insert sql statement. * @param pStr sql string * @param length length of the sql string - * @param pInsertParam data in binary format to submit to vnode directly. * @param id operator id, generated by uuid generator. * @param msg extended error message if exists to help avoid the problem in sql statement. - * @return + * @return data in binary format to submit to vnode directly. */ -int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen); + int32_t qParseInsertSql(SParseContext* pContext, struct SInsertStmtInfo** pInfo); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that diff --git a/include/os/osString.h b/include/os/osString.h index 358324075fab292657a207c37c3d1292a137b0d9..582411d4440e7b1539c4b926413c7ed9751dfe62 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -38,11 +38,11 @@ extern "C" { (dst)[(size)-1] = 0; \ } while (0) -int64_t taosStr2int64(char *str); +int64_t taosStr2int64(const char *str); // USE_LIBICONV int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs); -bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len); +bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len); int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize); bool taosValidateEncodec(const char *encodec); char * taosCharsetReplace(char *charsetstr); diff --git a/include/util/tutil.h b/include/util/tutil.h index e4e66901d34ec2fc56d41751dfc13fb5b3c1428c..3a07b898c0e514f890fe05bd91a10cc5c7144926 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -29,7 +29,7 @@ int32_t strdequote(char *src); int32_t strndequote(char *dst, const char *z, int32_t len); int32_t strRmquote(char *z, int32_t len); size_t strtrim(char *src); -char *strnchr(char *haystack, char needle, int32_t len, bool skipquote); +char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote); char **strsplit(char *src, const char *delim, int32_t *num); char *strtolower(char *dst, const char *src); char *strntolower(char *dst, const char *src, int32_t n); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 6c795bb0cc1d8e34616553ac0a79c5a0175558b6..5c6b9ff6f33e382f10fb2045df9e5ec432b28732 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -82,18 +82,18 @@ void deltaToUtcInitOnce() { } static int64_t parseFraction(char* str, char** end, int32_t timePrec); -static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim); +static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); static char* forwardToTimeStringEnd(char* str); -static bool checkTzPresent(char *str, int32_t len); +static bool checkTzPresent(const char *str, int32_t len); static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = { parseLocaltime, parseLocaltimeDst }; -int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { +int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { return parseTimeWithTz(timestr, time, timePrec, 'T'); @@ -104,7 +104,7 @@ int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePre } } -bool checkTzPresent(char *str, int32_t len) { +bool checkTzPresent(const char *str, int32_t len) { char *seg = forwardToTimeStringEnd(str); int32_t seg_len = len - (int32_t)(seg - str); @@ -237,7 +237,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { * 2013-04-12T15:52:01+0800 * 2013-04-12T15:52:01.123+0800 */ -int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim) { +int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) { int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); @@ -432,7 +432,7 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time * d - Days (24 hours) * w - Weeks (7 days) */ -int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) { +int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) { errno = 0; char* endPtr = NULL; diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index ed1f4d2bfd7826514a5d9a18665f33d8b01aed54..8a5a300fcfb79dd6f04f1d203469ebe827bb5cc8 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -75,7 +75,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool* return 0; } -void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type) { +void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type) { int32_t ret = 0; memset(pVar, 0, sizeof(SVariant)); diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index eb288e0aa2b021c080b1faf156f7960ca1e3b0ca..b26bd227f828969db17b6731b5ffbf1d7b06ff32 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -275,6 +275,7 @@ FstNode* fstGetRoot(Fst *fst); FstType fstGetType(Fst *fst); CompiledAddr fstGetRootAddr(Fst *fst); Output fstEmptyFinalOutput(Fst *fst, bool *null); + bool fstVerify(Fst *fst); @@ -291,11 +292,11 @@ typedef struct StreamState { void streamStateDestroy(void *s); typedef struct StreamWithState { - Fst *fst; - Automation *aut; - SArray *inp; - FstOutput emptyOutput; - SArray *stack; // + Fst *fst; + AutomationCtx *aut; + SArray *inp; + FstOutput emptyOutput; + SArray *stack; // FstBoundWithData *endAt; } StreamWithState ; @@ -310,19 +311,19 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta void swsResultDestroy(StreamWithStateResult *result); typedef void* (*StreamCallback)(void *); -StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; +StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) ; void streamWithStateDestroy(StreamWithState *sws); bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); typedef struct FstStreamBuilder { Fst *fst; - Automation *aut; + AutomationCtx *aut; FstBoundWithData *min; FstBoundWithData *max; } FstStreamBuilder; -FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut); +FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut); // set up bound range // refator, simple code by marco diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 480d3110c418766d8f99e72bffeed0256451e845..4a29f2e3a6df82235afa343f9745bac8e95e4883 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -19,33 +19,40 @@ extern "C" { #endif +#include "index_fst_util.h" typedef struct AutomationCtx AutomationCtx; +typedef enum AutomationType { + AUTOMATION_PREFIX, + AUTMMATION_MATCH +} AutomationType; + typedef struct StartWith { AutomationCtx *autoSelf; } StartWith; typedef struct Complement { AutomationCtx *autoSelf; - } Complement; // automation typedef struct AutomationCtx { -// automation interface - void *data; + AutomationType type; } AutomationCtx; -typedef struct Automation { - void* (*start)() ; - bool (*isMatch)(void *); - bool (*canMatch)(void *data); - bool (*willAlwaysMatch)(void *state); - void* (*accept)(void *state, uint8_t byte); - void* (*acceptEof)(void *state); - void *data; -} Automation; +typedef struct AutomationFunc { + void* (*start)(AutomationCtx *ctx) ; + bool (*isMatch)(AutomationCtx *ctx, void *); + bool (*canMatch)(AutomationCtx *ctx, void *data); + bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state); + void* (*accept)(AutomationCtx *ctx, void *state, uint8_t byte); + void* (*acceptEof)(AutomationCtx *ct, void *state); +} AutomationFunc; + +AutomationCtx *automCtxCreate(void *data, AutomationType type); +void autoCtxDestroy(AutomationCtx *ctx); +extern AutomationFunc automFuncs[]; #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 6a81e888d2c6cd1b233afa29d2f8fca0138fc6d9..18e617f6aea951161d8a6edab7d32024645277c0 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1072,7 +1072,6 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { tOut += trn.out; root = fstGetNode(fst, trn.addr); taosArrayPush(nodes, &root); - //fstNodeDestroy(root); } if (!FST_NODE_IS_FINAL(root)) { return false; @@ -1177,7 +1176,7 @@ void fstBoundDestroy(FstBoundWithData *bound) { free(bound); } -StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) { +StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) { StreamWithState *sws = calloc(1, sizeof(StreamWithState)); if (sws == NULL) { return NULL; } @@ -1204,6 +1203,8 @@ void streamWithStateDestroy(StreamWithState *sws) { } bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { + + AutomationCtx *aut = sws->aut; if (fstBoundWithDataIsEmpty(min)) { if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); @@ -1211,7 +1212,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { StreamState s = {.node = fstGetRoot(sws->fst), .trans = 0, .out = {.null = false, .out = 0}, - .autState = sws->aut->start()}; // auto.start callback + .autState = automFuncs[aut->type].start(aut)}; // auto.start callback taosArrayPush(sws->stack, &s); return true; } @@ -1229,7 +1230,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { FstNode *node = fstGetRoot(sws->fst); Output out = 0; - void* autState = sws->aut->start(); + //void* autState = sws->aut->start(); + void* autState = automFuncs[aut->type].start(aut); int32_t len; uint8_t *data = fstSliceData(key, &len); @@ -1241,7 +1243,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { FstTransition trn; fstNodeGetTransitionAt(node, res, &trn); void *preState = autState; - autState = sws->aut->accept(preState, b); + // autState = sws->aut->accept(preState, b); + autState = automFuncs[aut->type].accept(aut, preState, b); taosArrayPush(sws->inp, &b); StreamState s = {.node = node, .trans = res + 1, @@ -1298,6 +1301,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { } StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) { + AutomationCtx *aut = sws->aut; FstOutput output = sws->emptyOutput; if (output.null == false) { FstSlice emptySlice = fstSliceCreate(NULL, 0); @@ -1306,15 +1310,15 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); return NULL; } - void* start = sws->aut->start(); - if (sws->aut->isMatch(start)) { + void *start = automFuncs[aut->type].start(aut); + if (automFuncs[aut->type].isMatch(aut, start)) { FstSlice s = fstSliceCreate(NULL, 0); return swsResultCreate(&s, output, callback(start)); } } while (taosArrayGetSize(sws->stack) > 0) { StreamState *p = (StreamState *)taosArrayPop(sws->stack); - if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) { + if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } @@ -1324,16 +1328,18 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb FstTransition trn; fstNodeGetTransitionAt(p->node, p->trans, &trn); Output out = p->out.out + trn.out; - void* nextState = sws->aut->accept(p->autState, trn.inp); + void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp); void* tState = callback(nextState); - bool isMatch = sws->aut->isMatch(nextState); + bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); + //bool isMatch = sws->aut->isMatch(nextState); FstNode *nextNode = fstGetNode(sws->fst, trn.addr); taosArrayPush(sws->inp, &(trn.inp)); if (FST_NODE_IS_FINAL(nextNode)) { - void *eofState = sws->aut->acceptEof(nextState); + //void *eofState = sws->aut->acceptEof(nextState); + void *eofState = automFuncs[aut->type].acceptEof(aut, nextState); if (eofState != NULL) { - isMatch = sws->aut->isMatch(eofState); + isMatch = automFuncs[aut->type].isMatch(aut, eofState); } } StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; @@ -1391,7 +1397,7 @@ void streamStateDestroy(void *s) { //free(s->autoState); } -FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { +FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut) { FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder)); if (NULL == b) { return NULL; } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index 3d5efd30f3c81ff7660b6a3ec1f4c5ec7b147af5..748c55c29d4b76a71dd1495de18f6b35bd834e3a 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -13,3 +13,85 @@ * along with this program. If not, see . */ +#include "index_fst_automation.h" + + +// prefix query, impl later +static void* prefixStart(AutomationCtx *ctx) { + return NULL; +}; +static bool prefixIsMatch(AutomationCtx *ctx, void *data) { + return true; +} +static bool prefixCanMatch(AutomationCtx *ctx, void *data) { + return true; +} +static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) { + return true; +} +static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) { + return NULL; +} +static void* prefixAcceptEof(AutomationCtx *ctx, void *state) { + return NULL; +} + +// pattern query, impl later + +static void* patternStart(AutomationCtx *ctx) { + return NULL; +} +static bool patternIsMatch(AutomationCtx *ctx, void *data) { + return true; +} +static bool patternCanMatch(AutomationCtx *ctx, void *data) { + return true; +} +static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) { + return true; +} + +static void* patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) { + return NULL; +} + +static void* patternAcceptEof(AutomationCtx *ctx, void *state) { + return NULL; +} + +AutomationFunc automFuncs[] = {{ + prefixStart, + prefixIsMatch, + prefixCanMatch, + prefixWillAlwaysMatch, + prefixAccept, + prefixAcceptEof + }, + { + patternStart, + patternIsMatch, + patternCanMatch, + patternWillAlwaysMatch, + patternAccept, + patternAcceptEof + } + // add more search type +}; + +AutomationCtx* automCtxCreate(void *data, AutomationType type) { + AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx)); + if (ctx == NULL) { return NULL; } + + ctx->type = type; + if (ctx->type == AUTOMATION_PREFIX) { + + } else if (ctx->type == AUTMMATION_MATCH) { + + } else { + // add more search type + } + return ctx; +} +void autoCtxDestroy(AutomationCtx *ctx) { + free(ctx); +} diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 928c3875b07e3ac1caa03d0c9fef175c14ab36f8..0cfb0fedc37706bfddebd7c025690baf380be014 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -65,6 +65,7 @@ class FstReadMemory { ~FstReadMemory() { fstCountingWriterDestroy(_w); + fstDestroy(_fst); fstSliceDestroy(&_s); } @@ -129,10 +130,12 @@ class FstReadMemory { //} +#define L 100 +#define M 100 +#define N 100 int Performance_fstWriteRecords(FstWriter *b) { std::string str("aa"); - int L = 100, M = 100, N = 10; for (int i = 0; i < L; i++) { str[0] = 'a' + i; str.resize(2); @@ -150,22 +153,29 @@ int Performance_fstWriteRecords(FstWriter *b) { } void Performance_fstReadRecords(FstReadMemory *m) { - std::string str("a"); - for (int i = 0; i < 50; i++) { - //std::string str("aa"); - str.push_back('a'); - uint64_t out, cost; - bool ok = m->GetWithTimeCostUs(str, &out, &cost); - if (ok == true) { - printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost); - } else { - printf("failed to get(%s)\n", str.c_str()); - } - } + std::string str("aa"); + for (int i = 0; i < M; i++) { + str[0] = 'a' + i; + str.resize(2); + for(int j = 0; j < N; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < L; k++) { + str.push_back('a'); + uint64_t val, cost; + if (m->GetWithTimeCostUs(str, &val, &cost)) { + printf("succes to get kv(%s, %" PRId64"), cost: %" PRId64"\n", str.c_str(), val, cost); + } else { + printf("failed to get key: %s\n", str.c_str()); + } + } + } + } } void checkFstPerf() { FstWriter *fw = new FstWriter; int64_t s = taosGetTimestampUs(); + int num = Performance_fstWriteRecords(fw); int64_t e = taosGetTimestampUs(); printf("write %d record cost %" PRId64"us\n", num, e - s); @@ -173,13 +183,11 @@ void checkFstPerf() { FstReadMemory *m = new FstReadMemory(1024 * 64); if (m->init()) { - uint64_t val; - if(m->Get("aaaaaaa", &val)) { - std::cout << "succes to Get val: " << val << std::endl; - } else { - std::cout << "failed to Get " << std::endl; - } + printf("success to init fst read"); } + Performance_fstReadRecords(m); + + delete m; } diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h new file mode 100644 index 0000000000000000000000000000000000000000..350610ec0665fff1fc9ff6a2bc9919f632e6c0ea --- /dev/null +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DATABLOCKMGT_H +#define TDENGINE_DATABLOCKMGT_H + +#include "catalog.h" +#include "os.h" +#include "ttypes.h" +#include "tname.h" + +#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) + +typedef enum EOrderStatus { + ORDER_STATUS_UNKNOWN = 0, + ORDER_STATUS_ORDERED = 1, + ORDER_STATUS_DISORDERED = 2, +} EOrderStatus; + +typedef enum EValStat { + VAL_STAT_HAS = 0x0, // 0 means has val + VAL_STAT_NONE = 0x01, // 1 means no val +} EValStat; + +typedef enum ERowCompareStat { + ROW_COMPARE_NO_NEED = 0, + ROW_COMPARE_NEED = 1, +} ERowCompareStat; + +typedef struct SBoundColumn { + int32_t offset; // all column offset value + int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future + uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val) +} SBoundColumn; + +typedef struct { + uint16_t schemaColIdx; + uint16_t boundIdx; + uint16_t finalIdx; +} SBoundIdxInfo; + +typedef struct SParsedDataColInfo { + int16_t numOfCols; + int16_t numOfBound; + uint16_t flen; // TODO: get from STSchema + uint16_t allNullLen; // TODO: get from STSchema + uint16_t extendedVarLen; + int32_t *boundedColumns; // bound column idx according to schema + SBoundColumn *cols; + SBoundIdxInfo *colIdxInfo; + int8_t orderStatus; // bound columns +} SParsedDataColInfo; + +typedef struct SMemRowInfo { + int32_t dataLen; // len of SDataRow + int32_t kvLen; // len of SKVRow +} SMemRowInfo; + +typedef struct { + uint8_t memRowType; // default is 0, that is SDataRow + uint8_t compareStat; // 0 no need, 1 need compare + TDRowTLenT kvRowInitLen; + SMemRowInfo *rowInfo; +} SMemRowBuilder; + +typedef struct SParamInfo { + int32_t idx; + uint8_t type; + uint8_t timePrec; + int16_t bytes; + uint32_t offset; +} SParamInfo; + +typedef struct STableDataBlocks { + SName tableName; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgId; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfTables; // number of tables in current submit block + int32_t rowSize; // row size for current table + uint32_t nAllocSize; + uint32_t headerSize; // header for table info (uid, tid, submit metadata) + uint32_t size; + STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache + char *pData; + bool cloned; + STagData tagData; + + SParsedDataColInfo boundColumnInfo; + + // for parameter ('?') binding + uint32_t numOfAllocedParams; + uint32_t numOfParams; + SParamInfo * params; + SMemRowBuilder rowBuilder; +} STableDataBlocks; + +static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) { + memRowSetType(row, memRowType); + if (isDataRowT(memRowType)) { + dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion); + dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen)); + } else { + ASSERT(nBoundCols > 0); + memRowSetKvVersion(row, pBlock->pTableMeta->sversion); + kvRowSetNCols(memRowKvBody(row), nBoundCols); + kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols)); + } +} + +static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { + ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize); + return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; +} + +// Applicable to consume by one row +static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId, + int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen, + uint8_t compareStat) { + tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset); + if (compareStat == ROW_COMPARE_NEED) { + tdGetColAppendDeltaLen(value, colType, dataLen, kvLen); + } +} + +static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd, + int32_t idx, int32_t *toffset) { + int32_t schemaIdx = 0; + if (IS_DATA_COL_ORDERED(spd)) { + schemaIdx = spd->boundedColumns[idx]; + if (isDataRowT(memRowType)) { + *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart + } else { + *toffset = idx * sizeof(SColIdx); // the offset of SColIdx + } + } else { + ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); + schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; + if (isDataRowT(memRowType)) { + *toffset = (spd->cols + schemaIdx)->toffset; + } else { + *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx); + } + } +} + +static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { + if (isDataRow(row)) { + if (kvLen < (dataLen * KVRatioConvert)) { + memRowSetConvert(row); + } + } else if (kvLen > dataLen) { + memRowSetConvert(row); + } +} + +static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { + pBlocks->tid = pTableMeta->suid; + pBlocks->uid = pTableMeta->uid; + pBlocks->sversion = pTableMeta->sversion; + + if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } else { + pBlocks->numOfRows += numOfRows; + return TSDB_CODE_SUCCESS; + } +} + +int32_t schemaIdxCompar(const void *lhs, const void *rhs); +int32_t boundIdxCompar(const void *lhs, const void *rhs); +void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); +void destroyBoundColumnInfo(SParsedDataColInfo* pColList); +int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); +int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); +int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, + SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); +int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap); + +#endif // TDENGINE_DATABLOCKMGT_H diff --git a/source/libs/parser/inc/insertParser.h b/source/libs/parser/inc/insertParser.h index 49e678cd541eae0e6b1af9473d7405928089fbfe..a2a910fcca203db9707de220837350fbdb172322 100644 --- a/source/libs/parser/inc/insertParser.h +++ b/source/libs/parser/inc/insertParser.h @@ -16,4 +16,8 @@ #ifndef TDENGINE_INSERTPARSER_H #define TDENGINE_INSERTPARSER_H +#include "parser.h" + +int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo); + #endif // TDENGINE_INSERTPARSER_H diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index ca021653826e0550c92f7d7ebcc7bb85bf72310e..47f655d446cd7f9da52ccce86788c191b06b7a73 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -26,14 +26,6 @@ extern "C" { struct SSqlNode; -typedef struct SInsertStmtInfo { - SHashObj *pTableBlockHashList; // data block for each table - SArray *pDataBlocks; // SArray. Merged submit block for each vgroup - int8_t schemaAttached; // denote if submit block is built with table schema or not - uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert - uint32_t insertType; // insert data from [file|sql statement| bound statement] - char *sql; // current sql statement position -} SInsertStmtInfo; typedef struct SInternalField { TAOS_FIELD field; diff --git a/source/libs/parser/inc/parserUtil.h b/source/libs/parser/inc/parserUtil.h index 12ffe696c13ebb88b2bfd7faf1b388a1c9295ab6..6c95c4327b99aa08e570ce697e8e3504271704f5 100644 --- a/source/libs/parser/inc/parserUtil.h +++ b/source/libs/parser/inc/parserUtil.h @@ -46,7 +46,7 @@ SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index); int32_t parserValidateIdToken(SToken* pToken); int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg); -int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr); +int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr); STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo); @@ -61,6 +61,8 @@ void cleanupColumnCond(SArray** pCond); uint32_t convertRelationalOperator(SToken *pToken); int32_t getExprFunctionId(SExprInfo *pExprInfo); +STableMeta* tableMetaDup(const STableMeta* pTableMeta); + #ifdef __cplusplus } #endif diff --git a/source/libs/parser/inc/ttoken.h b/source/libs/parser/inc/ttoken.h index bacabe299e8c7c6dff957e9dd9addc10e2b2aef7..c041edf2b25d2ea9e270e3370e2df68ebcd19b8f 100644 --- a/source/libs/parser/inc/ttoken.h +++ b/source/libs/parser/inc/ttoken.h @@ -44,7 +44,7 @@ typedef struct SToken { * @param tokenType * @return */ -uint32_t tGetToken(char *z, uint32_t *tokenType); +uint32_t tGetToken(const char *z, uint32_t *tokenType); /** * enhanced tokenizer for sql string. @@ -54,7 +54,7 @@ uint32_t tGetToken(char *z, uint32_t *tokenType); * @param isPrevOptr * @return */ -SToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr); +SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr); /** * check if it is a keyword or not diff --git a/source/libs/parser/src/astGenerator.c b/source/libs/parser/src/astGenerator.c index 3b7d1cbc2941e5a99468d483714cdd27351c8069..509e9009f73001ffbb22fc59dd86f46591e97467 100644 --- a/source/libs/parser/src/astGenerator.c +++ b/source/libs/parser/src/astGenerator.c @@ -191,7 +191,7 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) { pExpr->type = SQL_NODE_EXPR; if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) { - char* endPos = pRight->exprToken.z + pRight->exprToken.n; + const char* endPos = pRight->exprToken.z + pRight->exprToken.n; pExpr->exprToken.z = pLeft->exprToken.z; pExpr->exprToken.n = (uint32_t)(endPos - pExpr->exprToken.z); pExpr->exprToken.type = pLeft->exprToken.type; diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c new file mode 100644 index 0000000000000000000000000000000000000000..4ece848888234a85dd1646bf60c80001bce1aca2 --- /dev/null +++ b/source/libs/parser/src/dataBlockMgt.c @@ -0,0 +1,665 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataBlockMgt.h" + +#include "catalog.h" +#include "parserUtil.h" +#include "queryInfoUtil.h" +#include "taosmsg.h" + +#define IS_RAW_PAYLOAD(t) \ + (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert + +typedef struct SBlockKeyTuple { + TSKEY skey; + void* payloadAddr; +} SBlockKeyTuple; + +typedef struct SBlockKeyInfo { + int32_t maxBytesAlloc; + SBlockKeyTuple* pKeyTuple; +} SBlockKeyInfo; + +static int32_t rowDataCompar(const void *lhs, const void *rhs) { + TSKEY left = *(TSKEY *)lhs; + TSKEY right = *(TSKEY *)rhs; + + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols) { + pColList->numOfCols = numOfCols; + pColList->numOfBound = numOfCols; + pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode + pColList->boundedColumns = calloc(pColList->numOfCols, sizeof(int32_t)); + pColList->cols = calloc(pColList->numOfCols, sizeof(SBoundColumn)); + pColList->colIdxInfo = NULL; + pColList->flen = 0; + pColList->allNullLen = 0; + + int32_t nVar = 0; + for (int32_t i = 0; i < pColList->numOfCols; ++i) { + uint8_t type = pSchema[i].type; + if (i > 0) { + pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes; + pColList->cols[i].toffset = pColList->flen; + } + pColList->flen += TYPE_BYTES[type]; + switch (type) { + case TSDB_DATA_TYPE_BINARY: + pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); + ++nVar; + break; + case TSDB_DATA_TYPE_NCHAR: + pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); + ++nVar; + break; + default: + break; + } + pColList->boundedColumns[i] = pSchema[i].colId; + } + pColList->allNullLen += pColList->flen; + pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); +} + +int32_t schemaIdxCompar(const void *lhs, const void *rhs) { + uint16_t left = *(uint16_t *)lhs; + uint16_t right = *(uint16_t *)rhs; + + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +int32_t boundIdxCompar(const void *lhs, const void *rhs) { + uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t)); + uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t)); + + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +void destroyBoundColumnInfo(SParsedDataColInfo* pColList) { + tfree(pColList->boundedColumns); + tfree(pColList->cols); + tfree(pColList->colIdxInfo); +} + +static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name, + const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { + STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); + if (dataBuf == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + dataBuf->nAllocSize = (uint32_t)defaultSize; + dataBuf->headerSize = startOffset; + + // the header size will always be the startOffset value, reserved for the subumit block header + if (dataBuf->nAllocSize <= dataBuf->headerSize) { + dataBuf->nAllocSize = dataBuf->headerSize * 2; + } + + //dataBuf->pData = calloc(1, dataBuf->nAllocSize); + dataBuf->pData = malloc(dataBuf->nAllocSize); + if (dataBuf->pData == NULL) { + tfree(dataBuf); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); + + //Here we keep the tableMeta to avoid it to be remove by other threads. + dataBuf->pTableMeta = tableMetaDup(pTableMeta); + + SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo; + SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta); + setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns); + + dataBuf->ordered = true; + dataBuf->prevTS = INT64_MIN; + dataBuf->rowSize = rowSize; + dataBuf->size = startOffset; + dataBuf->tsSource = -1; + dataBuf->vgId = dataBuf->pTableMeta->vgId; + + tNameAssign(&dataBuf->tableName, name); + + assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); + + *dataBlocks = dataBuf; + return TSDB_CODE_SUCCESS; +} + +int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, + SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, + SArray* pBlockList) { + *dataBlocks = NULL; + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); + if (t1 != NULL) { + *dataBlocks = *t1; + } + + if (*dataBlocks == NULL) { + int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, name, pTableMeta, dataBlocks); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); + if (pBlockList) { + taosArrayPush(pBlockList, dataBlocks); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t getRowExpandSize(STableMeta* pTableMeta) { + int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE; + int32_t columns = getNumOfColumns(pTableMeta); + SSchema* pSchema = getTableColumnSchema(pTableMeta); + for (int32_t i = 0; i < columns; i++) { + if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { + result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; + } + } + return result; +} + +/** + * TODO: Move to tdataformat.h and refactor when STSchema available. + * - fetch flen and toffset from STSChema and remove param spd + */ +static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, SParsedDataColInfo *spd) { + ASSERT(isKvRow(src)); + SKVRow kvRow = memRowKvBody(src); + SDataRow dataRow = memRowDataBody(dest); + + memRowSetType(dest, SMEM_ROW_DATA); + dataRowSetVersion(dataRow, memRowKvVersion(src)); + dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + spd->flen)); + + int32_t kvIdx = 0; + for (int i = 0; i < nCols; ++i) { + SSchema *schema = pSchema + i; + void * val = tdGetKVRowValOfColEx(kvRow, schema->colId, &kvIdx); + tdAppendDataColVal(dataRow, val != NULL ? val : getNullValue(schema->type), true, schema->type, + (spd->cols + i)->toffset); + } +} + +// TODO: Move to tdataformat.h and refactor when STSchema available. +static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, SParsedDataColInfo *spd) { + ASSERT(isDataRow(src)); + + SDataRow dataRow = memRowDataBody(src); + SKVRow kvRow = memRowKvBody(dest); + + memRowSetType(dest, SMEM_ROW_KV); + memRowSetKvVersion(kvRow, dataRowVersion(dataRow)); + kvRowSetNCols(kvRow, nBoundCols); + kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols)); + + int32_t toffset = 0, kvOffset = 0; + for (int i = 0; i < nCols; ++i) { + if ((spd->cols + i)->valStat == VAL_STAT_HAS) { + SSchema *schema = pSchema + i; + toffset = (spd->cols + i)->toffset; + void *val = tdGetRowDataOfCol(dataRow, schema->type, toffset + TD_DATA_ROW_HEAD_SIZE); + tdAppendKvColVal(kvRow, val, true, schema->colId, schema->type, kvOffset); + kvOffset += sizeof(SColIdx); + } + } +} + +// TODO: Move to tdataformat.h and refactor when STSchema available. +static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlocks *pBlock) { + STableMeta * pTableMeta = pBlock->pTableMeta; + STableComInfo tinfo = getTableInfo(pTableMeta); + SSchema * pSchema = getTableColumnSchema(pTableMeta); + SParsedDataColInfo *spd = &pBlock->boundColumnInfo; + + ASSERT(dest != src); + + if (isDataRow(src)) { + // TODO: Can we use pBlock -> numOfParam directly? + ASSERT(spd->numOfBound > 0); + convertToSKVRow(dest, src, pSchema, tinfo.numOfColumns, spd->numOfBound, spd); + } else { + convertToSDataRow(dest, src, pSchema, tinfo.numOfColumns, spd); + } +} + +void destroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { + if (pDataBlock == NULL) { + return; + } + + tfree(pDataBlock->pData); + + if (removeMeta) { + char name[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pDataBlock->tableName, name); + + // taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + } + + if (!pDataBlock->cloned) { + tfree(pDataBlock->params); + + // free the refcount for metermeta + if (pDataBlock->pTableMeta != NULL) { + tfree(pDataBlock->pTableMeta); + } + + destroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + } + + tfree(pDataBlock); +} + +void* destroyBlockArrayList(SArray* pDataBlockList) { + if (pDataBlockList == NULL) { + return NULL; + } + + size_t size = taosArrayGetSize(pDataBlockList); + for (int32_t i = 0; i < size; i++) { + void* d = taosArrayGetP(pDataBlockList, i); + destroyDataBlock(d, false); + } + + taosArrayDestroy(pDataBlockList); + return NULL; +} + +// data block is disordered, sort it in ascending order +void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { + SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; + + // size is less than the total size, since duplicated rows may be removed yet. + assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size); + + if (!dataBuf->ordered) { + char *pBlockData = pBlocks->data; + qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); + + int32_t i = 0; + int32_t j = 1; + + while (j < pBlocks->numOfRows) { + TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); + TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); + + if (ti == tj) { + ++j; + continue; + } + + int32_t nextPos = (++i); + if (nextPos != j) { + memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); + } + + ++j; + } + + dataBuf->ordered = true; + + pBlocks->numOfRows = i + 1; + dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows; + } + + dataBuf->prevTS = INT64_MIN; +} + +// data block is disordered, sort it in ascending order +int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) { + SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; + int16_t nRows = pBlocks->numOfRows; + + // size is less than the total size, since duplicated rows may be removed yet. + + // allocate memory + size_t nAlloc = nRows * sizeof(SBlockKeyTuple); + if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) { + char *tmp = realloc(pBlkKeyInfo->pKeyTuple, nAlloc); + if (tmp == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp; + pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc; + } + memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc); + + int32_t extendedRowSize = getExtendedRowSize(dataBuf); + SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + char * pBlockData = pBlocks->data; + int n = 0; + while (n < nRows) { + pBlkKeyTuple->skey = memRowKey(pBlockData); + pBlkKeyTuple->payloadAddr = pBlockData; + + // next loop + pBlockData += extendedRowSize; + ++pBlkKeyTuple; + ++n; + } + + if (!dataBuf->ordered) { + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); + + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + int32_t i = 0; + int32_t j = 1; + while (j < nRows) { + TSKEY ti = (pBlkKeyTuple + i)->skey; + TSKEY tj = (pBlkKeyTuple + j)->skey; + + if (ti == tj) { + ++j; + continue; + } + + int32_t nextPos = (++i); + if (nextPos != j) { + memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); + } + ++j; + } + + dataBuf->ordered = true; + pBlocks->numOfRows = i + 1; + } + + dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; + dataBuf->prevTS = INT64_MIN; + + return 0; +} + +// Erase the empty space reserved for binary data +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) { + // TODO: optimize this function, handle the case while binary is not presented + STableMeta* pTableMeta = pTableDataBlock->pTableMeta; + STableComInfo tinfo = getTableInfo(pTableMeta); + SSchema* pSchema = getTableColumnSchema(pTableMeta); + + SSubmitBlk* pBlock = pDataBlock; + memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); + pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk); + + int32_t flen = 0; // original total length of row + + // schema needs to be included into the submit data block + if (schemaAttached) { + int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta); + for(int32_t j = 0; j < numOfCols; ++j) { + STColumn* pCol = (STColumn*) pDataBlock; + pCol->colId = htons(pSchema[j].colId); + pCol->type = pSchema[j].type; + pCol->bytes = htons(pSchema[j].bytes); + pCol->offset = 0; + + pDataBlock = (char*)pDataBlock + sizeof(STColumn); + flen += TYPE_BYTES[pSchema[j].type]; + } + + int32_t schemaSize = sizeof(STColumn) * numOfCols; + pBlock->schemaLen = schemaSize; + } else { + if (isRawPayload) { + for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { + flen += TYPE_BYTES[pSchema[j].type]; + } + } + pBlock->schemaLen = 0; + } + + char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); + pBlock->dataLen = 0; + int32_t numOfRows = htons(pBlock->numOfRows); + + if (isRawPayload) { + for (int32_t i = 0; i < numOfRows; ++i) { + SMemRow memRow = (SMemRow)pDataBlock; + memRowSetType(memRow, SMEM_ROW_DATA); + SDataRow trow = memRowDataBody(memRow); + dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); + dataRowSetVersion(trow, pTableMeta->sversion); + + int toffset = 0; + for (int32_t j = 0; j < tinfo.numOfColumns; j++) { + tdAppendColVal(trow, p, pSchema[j].type, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + p += pSchema[j].bytes; + } + + pDataBlock = (char*)pDataBlock + memRowTLen(memRow); + pBlock->dataLen += memRowTLen(memRow); + } + } else { + for (int32_t i = 0; i < numOfRows; ++i) { + char* payload = (blkKeyTuple + i)->payloadAddr; + if (isNeedConvertRow(payload)) { + convertSMemRow(pDataBlock, payload, pTableDataBlock); + TDRowTLenT rowTLen = memRowTLen(pDataBlock); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; + } else { + TDRowTLenT rowTLen = memRowTLen(payload); + memcpy(pDataBlock, payload, rowTLen); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; + } + } + } + + int32_t len = pBlock->dataLen + pBlock->schemaLen; + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + + return len; +} + +static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) { + // todo +} + +int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) { + const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); + int code = 0; + bool isRawPayload = IS_RAW_PAYLOAD(payloadType); + SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); + + STableDataBlocks** p = taosHashIterate(pHashObj, NULL); + STableDataBlocks* pOneTableBlock = *p; + SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock + while (pOneTableBlock) { + SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; + if (pBlocks->numOfRows > 0) { + STableDataBlocks* dataBuf = NULL; + int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, + INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); + if (ret != TSDB_CODE_SUCCESS) { + taosHashCleanup(pVnodeDataBlockHashList); + destroyBlockArrayList(pVnodeDataBlockList); + tfree(blkKeyInfo.pKeyTuple); + return ret; + } + + // the maximum expanded size in byte when a row-wise data is converted to SDataRow format + int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; + int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); + + if (dataBuf->nAllocSize < destSize) { + dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); + char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); + if (tmp != NULL) { + dataBuf->pData = tmp; + } else { // failed to allocate memory, free already allocated memory and return error code + taosHashCleanup(pVnodeDataBlockHashList); + destroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + if (isRawPayload) { + sortRemoveDataBlockDupRowsRaw(pOneTableBlock); + } else { + if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { + taosHashCleanup(pVnodeDataBlockHashList); + destroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); + tfree(blkKeyInfo.pKeyTuple); + return code; + } + ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); + } + + int32_t len = pBlocks->numOfRows * + (isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) + + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); + + pBlocks->tid = htonl(pBlocks->tid); + pBlocks->uid = htobe64(pBlocks->uid); + pBlocks->sversion = htonl(pBlocks->sversion); + pBlocks->numOfRows = htons(pBlocks->numOfRows); + pBlocks->schemaLen = 0; + + // erase the empty space reserved for binary data + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload); + assert(finalLen <= len); + + dataBuf->size += (finalLen + sizeof(SSubmitBlk)); + assert(dataBuf->size <= dataBuf->nAllocSize); + + // the length does not include the SSubmitBlk structure + pBlocks->dataLen = htonl(finalLen); + dataBuf->numOfTables += 1; + + pBlocks->numOfRows = 0; + } + + p = taosHashIterate(pHashObj, p); + if (p == NULL) { + break; + } + + pOneTableBlock = *p; + } + + extractTableNameList(pHashObj, freeBlockMap); + + // free the table data blocks; + // pInsertParam->pDataBlocks = pVnodeDataBlockList; + taosHashCleanup(pVnodeDataBlockHashList); + tfree(blkKeyInfo.pKeyTuple); + + return TSDB_CODE_SUCCESS; +} + +int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) { + size_t remain = pDataBlock->nAllocSize - pDataBlock->size; + const int factor = 5; + uint32_t nAllocSizeOld = pDataBlock->nAllocSize; + + // expand the allocated size + if (remain < rowSize * factor) { + while (remain < rowSize * factor) { + pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); + remain = pDataBlock->nAllocSize - pDataBlock->size; + } + + char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); + if (tmp != NULL) { + pDataBlock->pData = tmp; + memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); + } else { + // do nothing, if allocate more memory failed + pDataBlock->nAllocSize = nAllocSizeOld; + *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; + return TSDB_CODE_SUCCESS; +} + +int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen) { + ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols)); + if (nRows > 0) { + // already init(bind multiple rows by single column) + if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) { + return TSDB_CODE_SUCCESS; + } + } + + // default compareStat is ROW_COMPARE_NO_NEED + if (nBoundCols == 0) { // file input + pBuilder->memRowType = SMEM_ROW_DATA; + return TSDB_CODE_SUCCESS; + } else { + float boundRatio = ((float)nBoundCols / (float)nCols); + + if (boundRatio < KVRatioKV) { + pBuilder->memRowType = SMEM_ROW_KV; + return TSDB_CODE_SUCCESS; + } else if (boundRatio > KVRatioData) { + pBuilder->memRowType = SMEM_ROW_DATA; + return TSDB_CODE_SUCCESS; + } + pBuilder->compareStat = ROW_COMPARE_NEED; + + if (boundRatio < KVRatioPredict) { + pBuilder->memRowType = SMEM_ROW_KV; + } else { + pBuilder->memRowType = SMEM_ROW_DATA; + } + } + + pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); + + if (nRows > 0) { + pBuilder->rowInfo = calloc(nRows, sizeof(SMemRowInfo)); + if (pBuilder->rowInfo == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int i = 0; i < nRows; ++i) { + (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; + (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; + } + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c new file mode 100644 index 0000000000000000000000000000000000000000..0d9d6d7580c181c097aab73908d9d5f64d13d3b8 --- /dev/null +++ b/source/libs/parser/src/insertParser.c @@ -0,0 +1,891 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "insertParser.h" + +#include "dataBlockMgt.h" +#include "parserInt.h" +#include "parserUtil.h" +#include "queryInfoUtil.h" +#include "tglobal.h" +#include "ttime.h" +#include "ttoken.h" +#include "ttypes.h" + +#define NEXT_TOKEN(pSql, sToken) \ + do { \ + int32_t index = 0; \ + sToken = tStrGetToken(pSql, &index, false); \ + pSql += index; \ + } while (0) + +#define CHECK_CODE(expr) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + terrno = code; \ + return terrno; \ + } \ + } while (0) + +#define CHECK_CODE_1(expr, destroy) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + (void)destroy; \ + terrno = code; \ + return terrno; \ + } \ + } while (0) + +#define CHECK_CODE_2(expr, destroy1, destroy2) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + (void)destroy1; \ + (void)destroy2; \ + terrno = code; \ + return terrno; \ + } \ + } while (0) + +enum { + TSDB_USE_SERVER_TS = 0, + TSDB_USE_CLI_TS = 1, +}; + +typedef struct SInsertParseContext { + SParseContext* pComCxt; + const char* pSql; + SMsgBuf msg; + struct SCatalog* pCatalog; + SMetaData meta; // need release + const STableMeta* pTableMeta; + SHashObj* pTableBlockHashObj; // data block for each table. need release + int32_t totalNum; + SInsertStmtInfo* pOutput; +} SInsertParseContext; + +static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; +static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; + +static bool isNullStr(SToken *pToken) { + return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) && + (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)); +} + +static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) { + errno = 0; + *value = strtold(pToken->z, endPtr); + + // not a valid integer number, return error + if ((*endPtr - pToken->z) != pToken->n) { + return TK_ILLEGAL; + } + + return pToken->type; +} + +static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) { + errno = 0; + int32_t ret = 0; + + char* endPtr = NULL; + if (type == TK_FLOAT) { + double v = strtod(z, &endPtr); + if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) { + ret = -1; + } else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) { + ret = -1; + } else { + *value = (int64_t) round(v); + } + + errno = 0; + return ret; + } + + int32_t radix = 10; + if (type == TK_HEX) { + radix = 16; + } else if (type == TK_BIN) { + radix = 2; + } + + // the string may be overflow according to errno + if (!issigned) { + const char *p = z; + while(*p != 0 && *p == ' ') p++; + if (*p != 0 && *p == '-') { return -1;} + + *value = strtoull(z, &endPtr, radix); + } else { + *value = strtoll(z, &endPtr, radix); + } + + // not a valid integer number, return error + if (endPtr - z != n || errno == ERANGE) { + ret = -1; + } + + errno = 0; + return ret; +} + +static int32_t createInsertStmtInfo(SInsertStmtInfo **pInsertInfo) { + SInsertStmtInfo *info = calloc(1, sizeof(SQueryStmtInfo)); + if (NULL == info) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + // info->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + // if (NULL == info->pTableBlockHashList) { + // tfree(info); + // return TSDB_CODE_TSC_OUT_OF_MEMORY; + // } + *pInsertInfo = info; + return TSDB_CODE_SUCCESS; +} + +static int32_t skipInsertInto(SInsertParseContext* pCxt) { + SToken sToken; + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_INSERT != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z); + } + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_INTO != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) { + if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(&pCxt->msg, "invalid table name"); + } + + SName name = {0}; + strndequote(name.tname, pStname->z, pStname->n); + taosArrayPush(tableNameList, &name); + + return TSDB_CODE_SUCCESS; +} + +static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SMetaReq* pMetaReq) { + pMetaReq->pTableName = taosArrayInit(4, sizeof(SName)); + return buildTableName(pCxt, pStname, pMetaReq->pTableName); +} + +static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { + SMetaReq req; + CHECK_CODE(buildMetaReq(pCxt, pTname, &req)); + CHECK_CODE(catalogGetMetaData(pCxt->pCatalog, &req, &pCxt->meta)); + pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0); + return TSDB_CODE_SUCCESS; +} + +// todo speedup by using hash list +static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { + while (start < end) { + if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { + return start; + } + ++start; + } + return -1; +} + +static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) { + // once the data block is disordered, we do NOT keep previous timestamp any more + if (!pDataBlocks->ordered) { + return TSDB_CODE_SUCCESS; + } + + TSKEY k = *(TSKEY *)start; + + if (k == INT64_MIN) { + if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) { + return -1; + } else if (pDataBlocks->tsSource == -1) { + pDataBlocks->tsSource = TSDB_USE_SERVER_TS; + } + } else { + if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) { + return -1; // client time/server time can not be mixed + } else if (pDataBlocks->tsSource == -1) { + pDataBlocks->tsSource = TSDB_USE_CLI_TS; + } + } + + if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) { + pDataBlocks->ordered = false; + } + + pDataBlocks->prevTS = k; + return TSDB_CODE_SUCCESS; +} + +static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec, int64_t *time) { + int32_t index = 0; + SToken sToken; + int64_t interval; + int64_t useconds = 0; + const char* pTokenEnd = pCxt->pSql; + + if (pToken->type == TK_NOW) { + useconds = taosGetTimestamp(timePrec); + } else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) { + // do nothing + } else if (pToken->type == TK_INTEGER) { + useconds = taosStr2int64(pToken->z); + } else { + // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm); + if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp format", pToken->z); + } + + return TSDB_CODE_SUCCESS; + } + + for (int k = pToken->n; pToken->z[k] != '\0'; k++) { + if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue; + if (pToken->z[k] == ',') { + pCxt->pSql = pTokenEnd; + *time = useconds; + return 0; + } + + break; + } + + /* + * time expression: + * e.g., now+12a, now-5h + */ + SToken valueToken; + index = 0; + sToken = tStrGetToken(pTokenEnd, &index, false); + pTokenEnd += index; + + if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) { + index = 0; + valueToken = tStrGetToken(pTokenEnd, &index, false); + pTokenEnd += index; + + if (valueToken.n < 2) { + return buildSyntaxErrMsg(&pCxt->msg, "value expected in timestamp", sToken.z); + } + + char unit = 0; + if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + if (sToken.type == TK_PLUS) { + useconds += interval; + } else { + useconds = useconds - interval; + } + + pCxt->pSql = pTokenEnd; + } + + *time = useconds; + return TSDB_CODE_SUCCESS; +} + +typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param); + +typedef struct SKvParam { + char buf[TSDB_MAX_TAGS_LEN]; + SKVRowBuilder* builder; + SSchema* schema; +} SKvParam; + +static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) { + SKvParam* pa = (SKvParam*)param; + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { + STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len); + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf); + } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + } + varDataSetLen(pa->buf, output); + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf); + } else { + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, value); + } + return TSDB_CODE_SUCCESS; +} + +typedef struct SMemParam { + SMemRow row; + SSchema* schema; + int32_t toffset; + uint8_t compareStat; + int32_t dataLen; + int32_t kvLen; +} SMemParam; + +static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *param) { + SMemParam* pa = (SMemParam*)param; + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { + char *rowEnd = memRowEnd(pa->row); + STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); + appendMemRowColValEx(pa->row, rowEnd, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + char * rowEnd = memRowEnd(pa->row); + if (!taosMbsToUcs4(value, len, (char *)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + } + varDataSetLen(rowEnd, output); + appendMemRowColValEx(pa->row, rowEnd, false, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } else { + appendMemRowColValEx(pa->row, value, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t checkAndTrimValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, char* tmpTokenBuf) { + int16_t type = pToken->type; + if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && + type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || + (pToken->n == 0) || (type == TK_RP)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pToken->z); + } + + if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); + } + + // Remove quotation marks + if (TK_STRING == type) { + if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { + return buildSyntaxErrMsg(&pCxt->msg, "too long string", pToken->z); + } + // delete escape character: \\, \', \" + char delim = pToken->z[0]; + int32_t cnt = 0; + int32_t j = 0; + for (uint32_t k = 1; k < pToken->n - 1; ++k) { + if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) { + tmpTokenBuf[j] = pToken->z[k + 1]; + cnt++; + j++; + k++; + continue; + } + tmpTokenBuf[j] = pToken->z[k]; + j++; + } + tmpTokenBuf[j] = 0; + pToken->z = tmpTokenBuf; + pToken->n -= 2 + cnt; + } + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, FRowAppend func, void* param) { + int64_t iv; + int32_t ret; + char * endptr = NULL; + + CHECK_CODE(checkAndTrimValue(pCxt, pToken, pSchema, tmpTokenBuf)); + + if (isNullStr(pToken)) { + if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { + int64_t tmpVal = 0; + return func(&tmpVal, pSchema->bytes, param); + } + return func(getNullValue(pSchema->type), 0, param); + } + + switch (pSchema->type) { + case TSDB_DATA_TYPE_BOOL: { + if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { + if (strncmp(pToken->z, "true", pToken->n) == 0) { + return func(&TRUE_VALUE, pSchema->bytes, param); + } else if (strncmp(pToken->z, "false", pToken->n) == 0) { + return func(&FALSE_VALUE, pSchema->bytes, param); + } else { + return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); + } + } else if (pToken->type == TK_INTEGER) { + return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + } else if (pToken->type == TK_FLOAT) { + return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + } else { + return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); + } + break; + } + case TSDB_DATA_TYPE_TINYINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); + } else if (!IS_VALID_TINYINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z); + } + uint8_t tmpVal = (uint8_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UTINYINT:{ + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); + } else if (!IS_VALID_UTINYINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); + } + uint8_t tmpVal = (uint8_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_SMALLINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); + } else if (!IS_VALID_SMALLINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); + } + int16_t tmpVal = (int16_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_USMALLINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); + } else if (!IS_VALID_USMALLINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); + } + uint16_t tmpVal = (uint16_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_INT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); + } else if (!IS_VALID_INT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); + } + int32_t tmpVal = (int32_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); + } else if (!IS_VALID_UINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); + } + uint32_t tmpVal = (uint32_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_BIGINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); + } else if (!IS_VALID_BIGINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z); + } + return func(&iv, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UBIGINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); + } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z); + } + uint64_t tmpVal = (uint64_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_FLOAT: { + double dv; + if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); + } + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); + } + float tmpVal = (float)dv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_DOUBLE: { + double dv; + if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); + } + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); + } + return func(&dv, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_BINARY: { + // too long values will return invalid sql, not be truncated automatically + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z); + } + return func(pToken->z, pToken->n, param); + } + case TSDB_DATA_TYPE_NCHAR: { + return func(pToken->z, pToken->n, param); + } + case TSDB_DATA_TYPE_TIMESTAMP: { + int64_t tmpVal; + if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); + } + return func(&tmpVal, pSchema->bytes, param); + } + } + + return TSDB_CODE_FAILED; +} + +// pSql -> tag1_name, ...) +static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) { + int32_t nCols = pColList->numOfCols; + + pColList->numOfBound = 0; + memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols); + for (int32_t i = 0; i < nCols; ++i) { + pColList->cols[i].valStat = VAL_STAT_NONE; + } + + SToken sToken; + bool isOrdered = true; + int32_t lastColIdx = -1; // last column found + while (1) { + NEXT_TOKEN(pCxt->pSql, sToken); + + if (TK_RP == sToken.type) { + break; + } + + int32_t t = lastColIdx + 1; + int32_t index = findCol(&sToken, t, nCols, pSchema); + if (index < 0 && t > 0) { + index = findCol(&sToken, 0, t, pSchema); + isOrdered = false; + } + if (index < 0) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z); + } + if (pColList->cols[index].valStat == VAL_STAT_HAS) { + return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z); + } + lastColIdx = index; + pColList->cols[index].valStat = VAL_STAT_HAS; + pColList->boundedColumns[pColList->numOfBound] = index; + ++pColList->numOfBound; + } + + pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; + + if (!isOrdered) { + pColList->colIdxInfo = calloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); + if (NULL == pColList->colIdxInfo) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + SBoundIdxInfo* pColIdx = pColList->colIdxInfo; + for (uint16_t i = 0; i < pColList->numOfBound; ++i) { + pColIdx[i].schemaColIdx = (uint16_t)pColList->boundedColumns[i]; + pColIdx[i].boundIdx = i; + } + qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); + for (uint16_t i = 0; i < pColList->numOfBound; ++i) { + pColIdx[i].finalIdx = i; + } + qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); + } + + memset(&pColList->boundedColumns[pColList->numOfBound], 0, sizeof(int32_t) * (pColList->numOfCols - pColList->numOfBound)); + + return TSDB_CODE_SUCCESS; +} + +// pSql -> tag1_value, ...) +static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pSpd, SSchema* pTagsSchema, uint8_t precision) { + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + destroyBoundColumnInfo(pSpd); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SKvParam param = {.builder = &kvRowBuilder}; + SToken sToken; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" + for (int i = 0; i < pSpd->numOfBound; ++i) { + NEXT_TOKEN(pCxt->pSql, sToken); + SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]]; + param.schema = pSchema; + CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd)); + } + + destroyBoundColumnInfo(pSpd); + SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + if (NULL == row) { + return buildInvalidOperationMsg(&pCxt->msg, "tag value expected"); + } + tdSortKVRowByColIdx(row); + + // todo construct payload + + tfree(row); +} + +// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) +static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) { + SToken sToken; + + // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) + NEXT_TOKEN(pCxt->pSql, sToken); + CHECK_CODE(getTableMeta(pCxt, &sToken)); + if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { + return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); + } + + SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); + SParsedDataColInfo spd = {0}; + setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); + + // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_LP == sToken.type) { + CHECK_CODE_1(parseBoundColumns(pCxt, &spd, pTagsSchema), destroyBoundColumnInfo(&spd)); + NEXT_TOKEN(pCxt->pSql, sToken); + } + + if (TK_TAGS != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z); + } + // pSql -> (tag1_value, ...) + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_LP != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); + } + CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); + + return TSDB_CODE_SUCCESS; +} + +static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { + SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo; + SMemRowBuilder* pBuilder = &pDataBlocks->rowBuilder; + char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header + initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); + + bool isParseBindParam = false; + SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta); + SMemParam param = {.row = row}; + SToken sToken = {0}; + // 1. set the parsed value from sql string + for (int i = 0; i < spd->numOfBound; ++i) { + NEXT_TOKEN(pCxt->pSql, sToken); + // todo bind param + SSchema *pSchema = &schema[spd->boundedColumns[i]]; + param.schema = pSchema; + param.compareStat = pBuilder->compareStat; + getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, ¶m.toffset); + CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, ¶m)); + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { + TSKEY tsKey = memRowKey(row); + if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) { + buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; + } + } + } + + if (!isParseBindParam) { + // 2. check and set convert flag + if (pBuilder->compareStat == ROW_COMPARE_NEED) { + convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen); + } + + // 3. set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) { + SDataRow dataRow = memRowDataBody(row); + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { + tdAppendDataColVal(dataRow, getNullValue(schema[i].type), true, schema[i].type, spd->cols[i].toffset); + } + } + } + } + + *len = getExtendedRowSize(pDataBlocks); + return TSDB_CODE_SUCCESS; +} + +// pSql -> (field1_value, ...) [(field1_value2, ...) ...] +static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { + STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); + int32_t extendedRowSize = getExtendedRowSize(pDataBlock); + CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen)); + + (*numOfRows) = 0; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" + SToken sToken; + while (1) { + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_LP != sToken.type) { + break; + } + + if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { + int32_t tSize; + CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize)); + ASSERT(tSize >= maxRows); + maxRows = tSize; + } + + int32_t len = 0; + CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf)); + pDataBlock->size += len; + + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_RP != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z); + } + + (*numOfRows)++; + } + + if (0 == (*numOfRows)) { + return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) { + int32_t maxNumOfRows; + CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); + + int32_t numOfRows = 0; + CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows)); + + for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) { + SParamInfo *param = dataBuf->params + i; + if (param->idx == -1) { + // param->idx = pInsertParam->numOfParams++; + param->offset -= sizeof(SSubmitBlk); + } + } + + SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); + if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) { + return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); + } + + dataBuf->numOfTables = 1; + pCxt->totalNum += numOfRows; + return TSDB_CODE_SUCCESS; +} + +// tb_name +// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] +// [(field1_name, ...)] +// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path +// [...]; +static int32_t parseInsertBody(SInsertParseContext* pCxt) { + while (1) { + SToken sToken; + // pSql -> tb_name ... + NEXT_TOKEN(pCxt->pSql, sToken); + + // no data in the sql string anymore. + if (sToken.n == 0) { + if (0 == pCxt->totalNum) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + break; + } + + SToken tbnameToken = sToken; + NEXT_TOKEN(pCxt->pSql, sToken); + + // USING cluase + if (TK_USING == sToken.type) { + CHECK_CODE(parseUsingClause(pCxt, &tbnameToken)); + NEXT_TOKEN(pCxt->pSql, sToken); + } else { + CHECK_CODE(getTableMeta(pCxt, &sToken)); + } + + STableDataBlocks *dataBuf = NULL; + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, NULL/* tbname */, pCxt->pTableMeta, &dataBuf, NULL)); + + if (TK_LP == sToken.type) { + // pSql -> field1_name, ...) + CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); + NEXT_TOKEN(pCxt->pSql, sToken); + } + + if (TK_VALUES == sToken.type) { + // pSql -> (field1_value, ...) [(field1_value2, ...) ...] + CHECK_CODE(parseValuesClause(pCxt, dataBuf)); + continue; + } + + // FILE csv_file_path + if (TK_FILE == sToken.type) { + // pSql -> csv_file_path + NEXT_TOKEN(pCxt->pSql, sToken); + if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) { + return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); + } + // todo + continue; + } + + return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z); + } + // merge according to vgId + if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { + CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, true)); + } + return TSDB_CODE_SUCCESS; +} + +// INSERT INTO +// tb_name +// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] +// [(field1_name, ...)] +// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path +// [...]; +int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { + CHECK_CODE(createInsertStmtInfo(pInfo)); + + SInsertParseContext context = { + .pComCxt = pContext, + .pSql = pContext->pSql, + .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, + .pCatalog = getCatalogHandle(pContext->pEpSet), + .pTableMeta = NULL, + .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), + .totalNum = 0, + .pOutput = *pInfo + }; + + if (NULL == context.pTableBlockHashObj) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + CHECK_CODE(skipInsertInto(&context)); + CHECK_CODE(parseInsertBody(&context)); + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 29561f7f54ee55e785445c2b8ac5ee62c620dcb4..a8642e2535fd2cbd57a50558a05000bac3ea029e 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -18,6 +18,7 @@ #include "parserUtil.h" #include "ttoken.h" #include "function.h" +#include "insertParser.h" bool qIsInsertSql(const char* pStr, size_t length) { int32_t index = 0; @@ -46,8 +47,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen); } -int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen) { - return 0; +int32_t qParseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { + return parseInsertSql(pContext, pInfo); } int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) { @@ -173,7 +174,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet assert(t != NULL); if (t->n >= TSDB_FUNC_NAME_LEN) { - return buildSyntaxErrMsg(msg, msgBufLen, "too long function name", t->z); + return buildSyntaxErrMsg(&msgBuf, "too long function name", t->z); } // Let's assume that it is an UDF/UDAF, if it is not a built-in function. diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 3e83381a76e6145fa7157459f20c10460a3687e9..7b3239c0ddb7343e4a68bf247640434e86b798a6 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "parserUtil.h" + #include "taosmsg.h" #include "parser.h" #include "taoserror.h" @@ -18,7 +34,6 @@ typedef struct STableFilterCond { static STableMetaInfo* addTableMetaInfo(SQueryStmtInfo* pQueryInfo, SName* name, STableMeta* pTableMeta, SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables); -STableMeta* tableMetaDup(STableMeta* pTableMeta); int32_t parserValidateIdToken(SToken* pToken) { if (pToken == NULL || pToken->z == NULL || pToken->type != TK_ID) { @@ -87,7 +102,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) { return TSDB_CODE_TSC_INVALID_OPERATION; } -int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr) { +int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) { const char* msgFormat1 = "syntax error near \'%s\'"; const char* msgFormat2 = "syntax error near \'%s\' (%s)"; const char* msgFormat3 = "%s"; @@ -95,7 +110,7 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn const char* prefix = "syntax error"; if (sourceStr == NULL) { assert(additionalInfo != NULL); - snprintf(dst, dstBufLen, msgFormat1, additionalInfo); + snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo); return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } @@ -103,10 +118,10 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn strncpy(buf, sourceStr, tListLen(buf) - 1); if (additionalInfo != NULL) { - snprintf(dst, dstBufLen, msgFormat2, buf, additionalInfo); + snprintf(pBuf->buf, pBuf->len, msgFormat2, buf, additionalInfo); } else { const char* msgFormat = (0 == strncmp(sourceStr, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1; - snprintf(dst, dstBufLen, msgFormat, buf); + snprintf(pBuf->buf, pBuf->len, msgFormat, buf); } return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; @@ -1490,7 +1505,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild) { return pTableMeta; } -uint32_t getTableMetaSize(STableMeta* pTableMeta) { +uint32_t getTableMetaSize(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); int32_t totalCols = 0; @@ -1505,7 +1520,7 @@ uint32_t getTableMetaMaxSize() { return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema); } -STableMeta* tableMetaDup(STableMeta* pTableMeta) { +STableMeta* tableMetaDup(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); size_t size = getTableMetaSize(pTableMeta); diff --git a/source/libs/parser/src/ttokenizer.c b/source/libs/parser/src/ttokenizer.c index 683fe142ef7b41e4b79eb0bdb52e7bdd4d2fa8cc..d69fa2b95b0feb6fdd8e57f54d34331070baca90 100644 --- a/source/libs/parser/src/ttokenizer.c +++ b/source/libs/parser/src/ttokenizer.c @@ -284,7 +284,7 @@ static int32_t tKeywordCode(const char* z, int n) { * Return the length of the token that begins at z[0]. * Store the token type in *type before returning. */ -uint32_t tGetToken(char* z, uint32_t* tokenId) { +uint32_t tGetToken(const char* z, uint32_t* tokenId) { uint32_t i; switch (*z) { case ' ': @@ -595,7 +595,7 @@ SToken tscReplaceStrToken(char **str, SToken *token, const char* newToken) { return ntoken; } -SToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) { +SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) { SToken t0 = {0}; // here we reach the end of sql string, null-terminated string @@ -689,13 +689,12 @@ void taosCleanupKeywordsTable() { } SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) { - assert(pToken != NULL && buf != NULL); + assert(pToken != NULL && buf != NULL && len > pToken->n); + + strncpy(buf, pToken->z, pToken->n); + buf[pToken->n] = 0; + SToken token = *pToken; token.z = buf; - - assert(len > token.n); - strncpy(token.z, pToken->z, pToken->n); - token.z[token.n] = 0; - return token; } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 8054dc42be290a24657fe67531d192cdf5653a14..6063f816821ffc869a337e5c01b9e16bfa12efe5 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -37,7 +37,7 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } -int64_t taosStr2int64(char *str) { +int64_t taosStr2int64(const char *str) { char *endptr = NULL; return strtoll(str, &endptr, 10); } @@ -107,7 +107,7 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { return len; } -bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { +bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { memset(ucs4, 0, ucs4_max_len); mbstate_t state = {0}; int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state); diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 3c3c8a1dfcef528ca5f81382f986665b6adc5995..e8a5cdd3011ace34a18c89fd5706150f37413c5c 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -166,7 +166,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) { return split; } -char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) { +char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote) { for (int32_t i = 0; i < len; ++i) { // skip the needle in quote, jump to the end of quoted string @@ -179,7 +179,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) { } if (haystack[i] == needle) { - return &haystack[i]; + return (char *)&haystack[i]; } }