提交 dcc672d0 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -12,6 +12,7 @@ debug/
release/
target/
debs/
deps/
rpms/
mac/
*.pyc
......
......@@ -544,7 +544,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
......
......@@ -44,10 +44,10 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
......
......@@ -40,7 +40,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
bool taosVariantIsValid(SVariant *pVar);
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type);
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type);
void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uint32_t type);
......
......@@ -131,6 +131,18 @@ struct SInsertStmtInfo;
*/
bool qIsInsertSql(const char* pStr, size_t length);
typedef struct SParseContext {
const char* pSql; // sql string
size_t sqlLen; // length of the sql string
int64_t id; // operator id, generated by uuid generator
const char* pDbname;
const SEpSet* pEpSet;
int8_t schemaAttached; // denote if submit block is built with table schema or not
char* pMsg; // extended error message if exists to help avoid the problem in sql statement.
int32_t msgLen; // max length of the msg
} SParseContext;
/**
* Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST.
* @param pSql sql string
......@@ -141,16 +153,35 @@ bool qIsInsertSql(const char* pStr, size_t length);
*/
int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg, int32_t msgLen);
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData;
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
/**
* Parse the insert sql statement.
* @param pStr sql string
* @param length length of the sql string
* @param pInsertParam data in binary format to submit to vnode directly.
* @param id operator id, generated by uuid generator.
* @param msg extended error message if exists to help avoid the problem in sql statement.
* @return
* @return data in binary format to submit to vnode directly.
*/
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen);
int32_t qParseInsertSql(SParseContext* pContext, struct SInsertStmtInfo** pInfo);
/**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
......
......@@ -38,11 +38,11 @@ extern "C" {
(dst)[(size)-1] = 0; \
} while (0)
int64_t taosStr2int64(char *str);
int64_t taosStr2int64(const char *str);
// USE_LIBICONV
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
bool taosValidateEncodec(const char *encodec);
char * taosCharsetReplace(char *charsetstr);
......
......@@ -29,7 +29,7 @@ int32_t strdequote(char *src);
int32_t strndequote(char *dst, const char *z, int32_t len);
int32_t strRmquote(char *z, int32_t len);
size_t strtrim(char *src);
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote);
char **strsplit(char *src, const char *delim, int32_t *num);
char *strtolower(char *dst, const char *src);
char *strntolower(char *dst, const char *src, int32_t n);
......
......@@ -82,18 +82,18 @@ void deltaToUtcInitOnce() {
}
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(char *str, int32_t len);
static bool checkTzPresent(const char *str, int32_t len);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
parseLocaltime,
parseLocaltimeDst
};
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec, 'T');
......@@ -104,7 +104,7 @@ int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePre
}
}
bool checkTzPresent(char *str, int32_t len) {
bool checkTzPresent(const char *str, int32_t len) {
char *seg = forwardToTimeStringEnd(str);
int32_t seg_len = len - (int32_t)(seg - str);
......@@ -237,7 +237,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
* 2013-04-12T15:52:01+0800
* 2013-04-12T15:52:01.123+0800
*/
int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim) {
int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) {
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
......@@ -432,7 +432,7 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time
* d - Days (24 hours)
* w - Weeks (7 days)
*/
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
errno = 0;
char* endPtr = NULL;
......
......@@ -75,7 +75,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
return 0;
}
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type) {
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type) {
int32_t ret = 0;
memset(pVar, 0, sizeof(SVariant));
......
......@@ -275,6 +275,7 @@ FstNode* fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
bool fstVerify(Fst *fst);
......@@ -291,11 +292,11 @@ typedef struct StreamState {
void streamStateDestroy(void *s);
typedef struct StreamWithState {
Fst *fst;
Automation *aut;
SArray *inp;
FstOutput emptyOutput;
SArray *stack; // <StreamState>
Fst *fst;
AutomationCtx *aut;
SArray *inp;
FstOutput emptyOutput;
SArray *stack; // <StreamState>
FstBoundWithData *endAt;
} StreamWithState ;
......@@ -310,19 +311,19 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
void swsResultDestroy(StreamWithStateResult *result);
typedef void* (*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) ;
void streamWithStateDestroy(StreamWithState *sws);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
typedef struct FstStreamBuilder {
Fst *fst;
Automation *aut;
AutomationCtx *aut;
FstBoundWithData *min;
FstBoundWithData *max;
} FstStreamBuilder;
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut);
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut);
// set up bound range
// refator, simple code by marco
......
......@@ -19,33 +19,40 @@
extern "C" {
#endif
#include "index_fst_util.h"
typedef struct AutomationCtx AutomationCtx;
typedef enum AutomationType {
AUTOMATION_PREFIX,
AUTMMATION_MATCH
} AutomationType;
typedef struct StartWith {
AutomationCtx *autoSelf;
} StartWith;
typedef struct Complement {
AutomationCtx *autoSelf;
} Complement;
// automation
typedef struct AutomationCtx {
// automation interface
void *data;
AutomationType type;
} AutomationCtx;
typedef struct Automation {
void* (*start)() ;
bool (*isMatch)(void *);
bool (*canMatch)(void *data);
bool (*willAlwaysMatch)(void *state);
void* (*accept)(void *state, uint8_t byte);
void* (*acceptEof)(void *state);
void *data;
} Automation;
typedef struct AutomationFunc {
void* (*start)(AutomationCtx *ctx) ;
bool (*isMatch)(AutomationCtx *ctx, void *);
bool (*canMatch)(AutomationCtx *ctx, void *data);
bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state);
void* (*accept)(AutomationCtx *ctx, void *state, uint8_t byte);
void* (*acceptEof)(AutomationCtx *ct, void *state);
} AutomationFunc;
AutomationCtx *automCtxCreate(void *data, AutomationType type);
void autoCtxDestroy(AutomationCtx *ctx);
extern AutomationFunc automFuncs[];
#ifdef __cplusplus
}
#endif
......
......@@ -1072,7 +1072,6 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
tOut += trn.out;
root = fstGetNode(fst, trn.addr);
taosArrayPush(nodes, &root);
//fstNodeDestroy(root);
}
if (!FST_NODE_IS_FINAL(root)) {
return false;
......@@ -1177,7 +1176,7 @@ void fstBoundDestroy(FstBoundWithData *bound) {
free(bound);
}
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState *streamWithStateCreate(Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState *sws = calloc(1, sizeof(StreamWithState));
if (sws == NULL) { return NULL; }
......@@ -1204,6 +1203,8 @@ void streamWithStateDestroy(StreamWithState *sws) {
}
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
AutomationCtx *aut = sws->aut;
if (fstBoundWithDataIsEmpty(min)) {
if (fstBoundWithDataIsIncluded(min)) {
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
......@@ -1211,7 +1212,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
StreamState s = {.node = fstGetRoot(sws->fst),
.trans = 0,
.out = {.null = false, .out = 0},
.autState = sws->aut->start()}; // auto.start callback
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
taosArrayPush(sws->stack, &s);
return true;
}
......@@ -1229,7 +1230,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
FstNode *node = fstGetRoot(sws->fst);
Output out = 0;
void* autState = sws->aut->start();
//void* autState = sws->aut->start();
void* autState = automFuncs[aut->type].start(aut);
int32_t len;
uint8_t *data = fstSliceData(key, &len);
......@@ -1241,7 +1243,8 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
FstTransition trn;
fstNodeGetTransitionAt(node, res, &trn);
void *preState = autState;
autState = sws->aut->accept(preState, b);
// autState = sws->aut->accept(preState, b);
autState = automFuncs[aut->type].accept(aut, preState, b);
taosArrayPush(sws->inp, &b);
StreamState s = {.node = node,
.trans = res + 1,
......@@ -1298,6 +1301,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
}
StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) {
AutomationCtx *aut = sws->aut;
FstOutput output = sws->emptyOutput;
if (output.null == false) {
FstSlice emptySlice = fstSliceCreate(NULL, 0);
......@@ -1306,15 +1310,15 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
return NULL;
}
void* start = sws->aut->start();
if (sws->aut->isMatch(start)) {
void *start = automFuncs[aut->type].start(aut);
if (automFuncs[aut->type].isMatch(aut, start)) {
FstSlice s = fstSliceCreate(NULL, 0);
return swsResultCreate(&s, output, callback(start));
}
}
while (taosArrayGetSize(sws->stack) > 0) {
StreamState *p = (StreamState *)taosArrayPop(sws->stack);
if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) {
if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) {
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
taosArrayPop(sws->inp);
}
......@@ -1324,16 +1328,18 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
FstTransition trn;
fstNodeGetTransitionAt(p->node, p->trans, &trn);
Output out = p->out.out + trn.out;
void* nextState = sws->aut->accept(p->autState, trn.inp);
void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp);
void* tState = callback(nextState);
bool isMatch = sws->aut->isMatch(nextState);
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
//bool isMatch = sws->aut->isMatch(nextState);
FstNode *nextNode = fstGetNode(sws->fst, trn.addr);
taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) {
void *eofState = sws->aut->acceptEof(nextState);
//void *eofState = sws->aut->acceptEof(nextState);
void *eofState = automFuncs[aut->type].acceptEof(aut, nextState);
if (eofState != NULL) {
isMatch = sws->aut->isMatch(eofState);
isMatch = automFuncs[aut->type].isMatch(aut, eofState);
}
}
StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
......@@ -1391,7 +1397,7 @@ void streamStateDestroy(void *s) {
//free(s->autoState);
}
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut) {
FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder));
if (NULL == b) { return NULL; }
......
......@@ -13,3 +13,85 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index_fst_automation.h"
// prefix query, impl later
static void* prefixStart(AutomationCtx *ctx) {
return NULL;
};
static bool prefixIsMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool prefixCanMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
}
static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
return NULL;
}
static void* prefixAcceptEof(AutomationCtx *ctx, void *state) {
return NULL;
}
// pattern query, impl later
static void* patternStart(AutomationCtx *ctx) {
return NULL;
}
static bool patternIsMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool patternCanMatch(AutomationCtx *ctx, void *data) {
return true;
}
static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) {
return true;
}
static void* patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
return NULL;
}
static void* patternAcceptEof(AutomationCtx *ctx, void *state) {
return NULL;
}
AutomationFunc automFuncs[] = {{
prefixStart,
prefixIsMatch,
prefixCanMatch,
prefixWillAlwaysMatch,
prefixAccept,
prefixAcceptEof
},
{
patternStart,
patternIsMatch,
patternCanMatch,
patternWillAlwaysMatch,
patternAccept,
patternAcceptEof
}
// add more search type
};
AutomationCtx* automCtxCreate(void *data, AutomationType type) {
AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { return NULL; }
ctx->type = type;
if (ctx->type == AUTOMATION_PREFIX) {
} else if (ctx->type == AUTMMATION_MATCH) {
} else {
// add more search type
}
return ctx;
}
void autoCtxDestroy(AutomationCtx *ctx) {
free(ctx);
}
......@@ -65,6 +65,7 @@ class FstReadMemory {
~FstReadMemory() {
fstCountingWriterDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
}
......@@ -129,10 +130,12 @@ class FstReadMemory {
//}
#define L 100
#define M 100
#define N 100
int Performance_fstWriteRecords(FstWriter *b) {
std::string str("aa");
int L = 100, M = 100, N = 10;
for (int i = 0; i < L; i++) {
str[0] = 'a' + i;
str.resize(2);
......@@ -150,22 +153,29 @@ int Performance_fstWriteRecords(FstWriter *b) {
}
void Performance_fstReadRecords(FstReadMemory *m) {
std::string str("a");
for (int i = 0; i < 50; i++) {
//std::string str("aa");
str.push_back('a');
uint64_t out, cost;
bool ok = m->GetWithTimeCostUs(str, &out, &cost);
if (ok == true) {
printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost);
} else {
printf("failed to get(%s)\n", str.c_str());
}
}
std::string str("aa");
for (int i = 0; i < M; i++) {
str[0] = 'a' + i;
str.resize(2);
for(int j = 0; j < N; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < L; k++) {
str.push_back('a');
uint64_t val, cost;
if (m->GetWithTimeCostUs(str, &val, &cost)) {
printf("succes to get kv(%s, %" PRId64"), cost: %" PRId64"\n", str.c_str(), val, cost);
} else {
printf("failed to get key: %s\n", str.c_str());
}
}
}
}
}
void checkFstPerf() {
FstWriter *fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int num = Performance_fstWriteRecords(fw);
int64_t e = taosGetTimestampUs();
printf("write %d record cost %" PRId64"us\n", num, e - s);
......@@ -173,13 +183,11 @@ void checkFstPerf() {
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init()) {
uint64_t val;
if(m->Get("aaaaaaa", &val)) {
std::cout << "succes to Get val: " << val << std::endl;
} else {
std::cout << "failed to Get " << std::endl;
}
printf("success to init fst read");
}
Performance_fstReadRecords(m);
delete m;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DATABLOCKMGT_H
#define TDENGINE_DATABLOCKMGT_H
#include "catalog.h"
#include "os.h"
#include "ttypes.h"
#include "tname.h"
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
typedef enum EOrderStatus {
ORDER_STATUS_UNKNOWN = 0,
ORDER_STATUS_ORDERED = 1,
ORDER_STATUS_DISORDERED = 2,
} EOrderStatus;
typedef enum EValStat {
VAL_STAT_HAS = 0x0, // 0 means has val
VAL_STAT_NONE = 0x01, // 1 means no val
} EValStat;
typedef enum ERowCompareStat {
ROW_COMPARE_NO_NEED = 0,
ROW_COMPARE_NEED = 1,
} ERowCompareStat;
typedef struct SBoundColumn {
int32_t offset; // all column offset value
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
} SBoundColumn;
typedef struct {
uint16_t schemaColIdx;
uint16_t boundIdx;
uint16_t finalIdx;
} SBoundIdxInfo;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema
uint16_t extendedVarLen;
int32_t *boundedColumns; // bound column idx according to schema
SBoundColumn *cols;
SBoundIdxInfo *colIdxInfo;
int8_t orderStatus; // bound columns
} SParsedDataColInfo;
typedef struct SMemRowInfo {
int32_t dataLen; // len of SDataRow
int32_t kvLen; // len of SKVRow
} SMemRowInfo;
typedef struct {
uint8_t memRowType; // default is 0, that is SDataRow
uint8_t compareStat; // 0 no need, 1 need compare
TDRowTLenT kvRowInitLen;
SMemRowInfo *rowInfo;
} SMemRowBuilder;
typedef struct SParamInfo {
int32_t idx;
uint8_t type;
uint8_t timePrec;
int16_t bytes;
uint32_t offset;
} SParamInfo;
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
bool cloned;
STagData tagData;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo * params;
SMemRowBuilder rowBuilder;
} STableDataBlocks;
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
memRowSetType(row, memRowType);
if (isDataRowT(memRowType)) {
dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion);
dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen));
} else {
ASSERT(nBoundCols > 0);
memRowSetKvVersion(row, pBlock->pTableMeta->sversion);
kvRowSetNCols(memRowKvBody(row), nBoundCols);
kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
}
}
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize);
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
}
// Applicable to consume by one row
static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
uint8_t compareStat) {
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
if (compareStat == ROW_COMPARE_NEED) {
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
}
}
static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd,
int32_t idx, int32_t *toffset) {
int32_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx];
if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
} else {
*toffset = idx * sizeof(SColIdx); // the offset of SColIdx
}
} else {
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset;
} else {
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
}
}
}
static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (kvLen < (dataLen * KVRatioConvert)) {
memRowSetConvert(row);
}
} else if (kvLen > dataLen) {
memRowSetConvert(row);
}
}
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->suid;
pBlocks->uid = pTableMeta->uid;
pBlocks->sversion = pTableMeta->sversion;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
}
}
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap);
#endif // TDENGINE_DATABLOCKMGT_H
......@@ -16,4 +16,8 @@
#ifndef TDENGINE_INSERTPARSER_H
#define TDENGINE_INSERTPARSER_H
#include "parser.h"
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
#endif // TDENGINE_INSERTPARSER_H
......@@ -26,14 +26,6 @@ extern "C" {
struct SSqlNode;
typedef struct SInsertStmtInfo {
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
char *sql; // current sql statement position
} SInsertStmtInfo;
typedef struct SInternalField {
TAOS_FIELD field;
......
......@@ -46,7 +46,7 @@ SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index);
int32_t parserValidateIdToken(SToken* pToken);
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr);
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
......@@ -61,6 +61,8 @@ void cleanupColumnCond(SArray** pCond);
uint32_t convertRelationalOperator(SToken *pToken);
int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
#ifdef __cplusplus
}
#endif
......
......@@ -44,7 +44,7 @@ typedef struct SToken {
* @param tokenType
* @return
*/
uint32_t tGetToken(char *z, uint32_t *tokenType);
uint32_t tGetToken(const char *z, uint32_t *tokenType);
/**
* enhanced tokenizer for sql string.
......@@ -54,7 +54,7 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
* @param isPrevOptr
* @return
*/
SToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr);
/**
* check if it is a keyword or not
......
......@@ -191,7 +191,7 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) {
pExpr->type = SQL_NODE_EXPR;
if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) {
char* endPos = pRight->exprToken.z + pRight->exprToken.n;
const char* endPos = pRight->exprToken.z + pRight->exprToken.n;
pExpr->exprToken.z = pLeft->exprToken.z;
pExpr->exprToken.n = (uint32_t)(endPos - pExpr->exprToken.z);
pExpr->exprToken.type = pLeft->exprToken.type;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataBlockMgt.h"
#include "catalog.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "taosmsg.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
int32_t maxBytesAlloc;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
TSKEY left = *(TSKEY *)lhs;
TSKEY right = *(TSKEY *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols) {
pColList->numOfCols = numOfCols;
pColList->numOfBound = numOfCols;
pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
pColList->boundedColumns = calloc(pColList->numOfCols, sizeof(int32_t));
pColList->cols = calloc(pColList->numOfCols, sizeof(SBoundColumn));
pColList->colIdxInfo = NULL;
pColList->flen = 0;
pColList->allNullLen = 0;
int32_t nVar = 0;
for (int32_t i = 0; i < pColList->numOfCols; ++i) {
uint8_t type = pSchema[i].type;
if (i > 0) {
pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
pColList->cols[i].toffset = pColList->flen;
}
pColList->flen += TYPE_BYTES[type];
switch (type) {
case TSDB_DATA_TYPE_BINARY:
pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
++nVar;
break;
case TSDB_DATA_TYPE_NCHAR:
pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
++nVar;
break;
default:
break;
}
pColList->boundedColumns[i] = pSchema[i].colId;
}
pColList->allNullLen += pColList->flen;
pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}
int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
uint16_t left = *(uint16_t *)lhs;
uint16_t right = *(uint16_t *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
int32_t boundIdxCompar(const void *lhs, const void *rhs) {
uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
tfree(pColList->boundedColumns);
tfree(pColList->cols);
tfree(pColList->colIdxInfo);
}
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset;
// the header size will always be the startOffset value, reserved for the subumit block header
if (dataBuf->nAllocSize <= dataBuf->headerSize) {
dataBuf->nAllocSize = dataBuf->headerSize * 2;
}
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) {
tfree(dataBuf);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
SArray* pBlockList) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
*dataBlocks = *t1;
}
if (*dataBlocks == NULL) {
int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, name, pTableMeta, dataBlocks);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t columns = getNumOfColumns(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
for (int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
}
return result;
}
/**
* TODO: Move to tdataformat.h and refactor when STSchema available.
* - fetch flen and toffset from STSChema and remove param spd
*/
static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, SParsedDataColInfo *spd) {
ASSERT(isKvRow(src));
SKVRow kvRow = memRowKvBody(src);
SDataRow dataRow = memRowDataBody(dest);
memRowSetType(dest, SMEM_ROW_DATA);
dataRowSetVersion(dataRow, memRowKvVersion(src));
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + spd->flen));
int32_t kvIdx = 0;
for (int i = 0; i < nCols; ++i) {
SSchema *schema = pSchema + i;
void * val = tdGetKVRowValOfColEx(kvRow, schema->colId, &kvIdx);
tdAppendDataColVal(dataRow, val != NULL ? val : getNullValue(schema->type), true, schema->type,
(spd->cols + i)->toffset);
}
}
// TODO: Move to tdataformat.h and refactor when STSchema available.
static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, SParsedDataColInfo *spd) {
ASSERT(isDataRow(src));
SDataRow dataRow = memRowDataBody(src);
SKVRow kvRow = memRowKvBody(dest);
memRowSetType(dest, SMEM_ROW_KV);
memRowSetKvVersion(kvRow, dataRowVersion(dataRow));
kvRowSetNCols(kvRow, nBoundCols);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
int32_t toffset = 0, kvOffset = 0;
for (int i = 0; i < nCols; ++i) {
if ((spd->cols + i)->valStat == VAL_STAT_HAS) {
SSchema *schema = pSchema + i;
toffset = (spd->cols + i)->toffset;
void *val = tdGetRowDataOfCol(dataRow, schema->type, toffset + TD_DATA_ROW_HEAD_SIZE);
tdAppendKvColVal(kvRow, val, true, schema->colId, schema->type, kvOffset);
kvOffset += sizeof(SColIdx);
}
}
}
// TODO: Move to tdataformat.h and refactor when STSchema available.
static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlocks *pBlock) {
STableMeta * pTableMeta = pBlock->pTableMeta;
STableComInfo tinfo = getTableInfo(pTableMeta);
SSchema * pSchema = getTableColumnSchema(pTableMeta);
SParsedDataColInfo *spd = &pBlock->boundColumnInfo;
ASSERT(dest != src);
if (isDataRow(src)) {
// TODO: Can we use pBlock -> numOfParam directly?
ASSERT(spd->numOfBound > 0);
convertToSKVRow(dest, src, pSchema, tinfo.numOfColumns, spd->numOfBound, spd);
} else {
convertToSDataRow(dest, src, pSchema, tinfo.numOfColumns, spd);
}
}
void destroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
}
tfree(pDataBlock->pData);
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name);
// taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
if (!pDataBlock->cloned) {
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock);
}
void* destroyBlockArrayList(SArray* pDataBlockList) {
if (pDataBlockList == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
destroyDataBlock(d, false);
}
taosArrayDestroy(pDataBlockList);
return NULL;
}
// data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
if (!dataBuf->ordered) {
char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0;
int32_t j = 1;
while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
if (ti == tj) {
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
}
dataBuf->prevTS = INT64_MIN;
}
// data block is disordered, sort it in ascending order
int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
int16_t nRows = pBlocks->numOfRows;
// size is less than the total size, since duplicated rows may be removed yet.
// allocate memory
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
char *tmp = realloc(pBlkKeyInfo->pKeyTuple, nAlloc);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
}
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
char * pBlockData = pBlocks->data;
int n = 0;
while (n < nRows) {
pBlkKeyTuple->skey = memRowKey(pBlockData);
pBlkKeyTuple->payloadAddr = pBlockData;
// next loop
pBlockData += extendedRowSize;
++pBlkKeyTuple;
++n;
}
if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0;
int32_t j = 1;
while (j < nRows) {
TSKEY ti = (pBlkKeyTuple + i)->skey;
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
}
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
dataBuf->prevTS = INT64_MIN;
return 0;
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) {
// TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = getTableInfo(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk);
int32_t flen = 0; // original total length of row
// schema needs to be included into the submit data block
if (schemaAttached) {
int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = htons(pSchema[j].colId);
pCol->type = pSchema[j].type;
pCol->bytes = htons(pSchema[j].bytes);
pCol->offset = 0;
pDataBlock = (char*)pDataBlock + sizeof(STColumn);
flen += TYPE_BYTES[pSchema[j].type];
}
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
if (isRawPayload) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
}
pBlock->schemaLen = 0;
}
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows);
if (isRawPayload) {
for (int32_t i = 0; i < numOfRows; ++i) {
SMemRow memRow = (SMemRow)pDataBlock;
memRowSetType(memRow, SMEM_ROW_DATA);
SDataRow trow = memRowDataBody(memRow);
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
dataRowSetVersion(trow, pTableMeta->sversion);
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, p, pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
}
pDataBlock = (char*)pDataBlock + memRowTLen(memRow);
pBlock->dataLen += memRowTLen(memRow);
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
char* payload = (blkKeyTuple + i)->payloadAddr;
if (isNeedConvertRow(payload)) {
convertSMemRow(pDataBlock, payload, pTableDataBlock);
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
}
}
int32_t len = pBlock->dataLen + pBlock->schemaLen;
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
return len;
}
static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) {
// todo
}
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
STableDataBlocks* pOneTableBlock = *p;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
while (pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) {
STableDataBlocks* dataBuf = NULL;
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) {
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
tfree(blkKeyInfo.pKeyTuple);
return ret;
}
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) {
dataBuf->pData = tmp;
} else { // failed to allocate memory, free already allocated memory and return error code
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
if (isRawPayload) {
sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
} else {
if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
}
int32_t len = pBlocks->numOfRows *
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize);
// the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0;
}
p = taosHashIterate(pHashObj, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
}
extractTableNameList(pHashObj, freeBlockMap);
// free the table data blocks;
// pInsertParam->pDataBlocks = pVnodeDataBlockList;
taosHashCleanup(pVnodeDataBlockHashList);
tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
// expand the allocated size
if (remain < rowSize * factor) {
while (remain < rowSize * factor) {
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
remain = pDataBlock->nAllocSize - pDataBlock->size;
}
char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_SUCCESS;
}
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen) {
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
if (nRows > 0) {
// already init(bind multiple rows by single column)
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
return TSDB_CODE_SUCCESS;
}
}
// default compareStat is ROW_COMPARE_NO_NEED
if (nBoundCols == 0) { // file input
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
} else {
float boundRatio = ((float)nBoundCols / (float)nCols);
if (boundRatio < KVRatioKV) {
pBuilder->memRowType = SMEM_ROW_KV;
return TSDB_CODE_SUCCESS;
} else if (boundRatio > KVRatioData) {
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
}
pBuilder->compareStat = ROW_COMPARE_NEED;
if (boundRatio < KVRatioPredict) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
pBuilder->memRowType = SMEM_ROW_DATA;
}
}
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
if (nRows > 0) {
pBuilder->rowInfo = calloc(nRows, sizeof(SMemRowInfo));
if (pBuilder->rowInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int i = 0; i < nRows; ++i) {
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
}
}
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "insertParser.h"
#include "dataBlockMgt.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttoken.h"
#include "ttypes.h"
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
sToken = tStrGetToken(pSql, &index, false); \
pSql += index; \
} while (0)
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
return terrno; \
} \
} while (0)
#define CHECK_CODE_1(expr, destroy) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
(void)destroy; \
terrno = code; \
return terrno; \
} \
} while (0)
#define CHECK_CODE_2(expr, destroy1, destroy2) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
(void)destroy1; \
(void)destroy2; \
terrno = code; \
return terrno; \
} \
} while (0)
enum {
TSDB_USE_SERVER_TS = 0,
TSDB_USE_CLI_TS = 1,
};
typedef struct SInsertParseContext {
SParseContext* pComCxt;
const char* pSql;
SMsgBuf msg;
struct SCatalog* pCatalog;
SMetaData meta; // need release
const STableMeta* pTableMeta;
SHashObj* pTableBlockHashObj; // data block for each table. need release
int32_t totalNum;
SInsertStmtInfo* pOutput;
} SInsertParseContext;
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static bool isNullStr(SToken *pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
return TK_ILLEGAL;
}
return pToken->type;
}
static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) {
errno = 0;
int32_t ret = 0;
char* endPtr = NULL;
if (type == TK_FLOAT) {
double v = strtod(z, &endPtr);
if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) {
ret = -1;
} else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) {
ret = -1;
} else {
*value = (int64_t) round(v);
}
errno = 0;
return ret;
}
int32_t radix = 10;
if (type == TK_HEX) {
radix = 16;
} else if (type == TK_BIN) {
radix = 2;
}
// the string may be overflow according to errno
if (!issigned) {
const char *p = z;
while(*p != 0 && *p == ' ') p++;
if (*p != 0 && *p == '-') { return -1;}
*value = strtoull(z, &endPtr, radix);
} else {
*value = strtoll(z, &endPtr, radix);
}
// not a valid integer number, return error
if (endPtr - z != n || errno == ERANGE) {
ret = -1;
}
errno = 0;
return ret;
}
static int32_t createInsertStmtInfo(SInsertStmtInfo **pInsertInfo) {
SInsertStmtInfo *info = calloc(1, sizeof(SQueryStmtInfo));
if (NULL == info) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// info->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
// if (NULL == info->pTableBlockHashList) {
// tfree(info);
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
*pInsertInfo = info;
return TSDB_CODE_SUCCESS;
}
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_INSERT != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
}
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_INTO != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) {
if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(&pCxt->msg, "invalid table name");
}
SName name = {0};
strndequote(name.tname, pStname->z, pStname->n);
taosArrayPush(tableNameList, &name);
return TSDB_CODE_SUCCESS;
}
static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SMetaReq* pMetaReq) {
pMetaReq->pTableName = taosArrayInit(4, sizeof(SName));
return buildTableName(pCxt, pStname, pMetaReq->pTableName);
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SMetaReq req;
CHECK_CODE(buildMetaReq(pCxt, pTname, &req));
CHECK_CODE(catalogGetMetaData(pCxt->pCatalog, &req, &pCxt->meta));
pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0);
return TSDB_CODE_SUCCESS;
}
// todo speedup by using hash list
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
while (start < end) {
if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
return start;
}
++start;
}
return -1;
}
static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
// once the data block is disordered, we do NOT keep previous timestamp any more
if (!pDataBlocks->ordered) {
return TSDB_CODE_SUCCESS;
}
TSKEY k = *(TSKEY *)start;
if (k == INT64_MIN) {
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
return -1;
} else if (pDataBlocks->tsSource == -1) {
pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
}
} else {
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
return -1; // client time/server time can not be mixed
} else if (pDataBlocks->tsSource == -1) {
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
}
}
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
pDataBlocks->ordered = false;
}
pDataBlocks->prevTS = k;
return TSDB_CODE_SUCCESS;
}
static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec, int64_t *time) {
int32_t index = 0;
SToken sToken;
int64_t interval;
int64_t useconds = 0;
const char* pTokenEnd = pCxt->pSql;
if (pToken->type == TK_NOW) {
useconds = taosGetTimestamp(timePrec);
} else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
// do nothing
} else if (pToken->type == TK_INTEGER) {
useconds = taosStr2int64(pToken->z);
} else {
// strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp format", pToken->z);
}
return TSDB_CODE_SUCCESS;
}
for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
if (pToken->z[k] == ',') {
pCxt->pSql = pTokenEnd;
*time = useconds;
return 0;
}
break;
}
/*
* time expression:
* e.g., now+12a, now-5h
*/
SToken valueToken;
index = 0;
sToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index;
if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
index = 0;
valueToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index;
if (valueToken.n < 2) {
return buildSyntaxErrMsg(&pCxt->msg, "value expected in timestamp", sToken.z);
}
char unit = 0;
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.type == TK_PLUS) {
useconds += interval;
} else {
useconds = useconds - interval;
}
pCxt->pSql = pTokenEnd;
}
*time = useconds;
return TSDB_CODE_SUCCESS;
}
typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param);
typedef struct SKvParam {
char buf[TSDB_MAX_TAGS_LEN];
SKVRowBuilder* builder;
SSchema* schema;
} SKvParam;
static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(pa->buf, output);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else {
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, value);
}
return TSDB_CODE_SUCCESS;
}
typedef struct SMemParam {
SMemRow row;
SSchema* schema;
int32_t toffset;
uint8_t compareStat;
int32_t dataLen;
int32_t kvLen;
} SMemParam;
static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *param) {
SMemParam* pa = (SMemParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
char *rowEnd = memRowEnd(pa->row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
appendMemRowColValEx(pa->row, rowEnd, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
char * rowEnd = memRowEnd(pa->row);
if (!taosMbsToUcs4(value, len, (char *)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(rowEnd, output);
appendMemRowColValEx(pa->row, rowEnd, false, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
} else {
appendMemRowColValEx(pa->row, value, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t checkAndTrimValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, char* tmpTokenBuf) {
int16_t type = pToken->type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(pToken->n == 0) || (type == TK_RP)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pToken->z);
}
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
}
// Remove quotation marks
if (TK_STRING == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(&pCxt->msg, "too long string", pToken->z);
}
// delete escape character: \\, \', \"
char delim = pToken->z[0];
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < pToken->n - 1; ++k) {
if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
tmpTokenBuf[j] = pToken->z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = pToken->z[k];
j++;
}
tmpTokenBuf[j] = 0;
pToken->z = tmpTokenBuf;
pToken->n -= 2 + cnt;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, FRowAppend func, void* param) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
CHECK_CODE(checkAndTrimValue(pCxt, pToken, pSchema, tmpTokenBuf));
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(&tmpVal, pSchema->bytes, param);
}
return func(getNullValue(pSchema->type), 0, param);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
return func(&TRUE_VALUE, pSchema->bytes, param);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
return func(&FALSE_VALUE, pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_FLOAT) {
return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UTINYINT:{
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z);
}
return func(&iv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z);
}
uint64_t tmpVal = (uint64_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_FLOAT: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
return func(&dv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BINARY: {
// too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_NCHAR: {
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal;
if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
}
return func(&tmpVal, pSchema->bytes, param);
}
}
return TSDB_CODE_FAILED;
}
// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
int32_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) {
pColList->cols[i].valStat = VAL_STAT_NONE;
}
SToken sToken;
bool isOrdered = true;
int32_t lastColIdx = -1; // last column found
while (1) {
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_RP == sToken.type) {
break;
}
int32_t t = lastColIdx + 1;
int32_t index = findCol(&sToken, t, nCols, pSchema);
if (index < 0 && t > 0) {
index = findCol(&sToken, 0, t, pSchema);
isOrdered = false;
}
if (index < 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
}
lastColIdx = index;
pColList->cols[index].valStat = VAL_STAT_HAS;
pColList->boundedColumns[pColList->numOfBound] = index;
++pColList->numOfBound;
}
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (!isOrdered) {
pColList->colIdxInfo = calloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
if (NULL == pColList->colIdxInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
for (uint16_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)pColList->boundedColumns[i];
pColIdx[i].boundIdx = i;
}
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (uint16_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
}
memset(&pColList->boundedColumns[pColList->numOfBound], 0, sizeof(int32_t) * (pColList->numOfCols - pColList->numOfBound));
return TSDB_CODE_SUCCESS;
}
// pSql -> tag1_value, ...)
static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pSpd, SSchema* pTagsSchema, uint8_t precision) {
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
destroyBoundColumnInfo(pSpd);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SKvParam param = {.builder = &kvRowBuilder};
SToken sToken;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
for (int i = 0; i < pSpd->numOfBound; ++i) {
NEXT_TOKEN(pCxt->pSql, sToken);
SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]];
param.schema = pSchema;
CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd));
}
destroyBoundColumnInfo(pSpd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
if (NULL == row) {
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
}
tdSortKVRowByColIdx(row);
// todo construct payload
tfree(row);
}
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
SToken sToken;
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(getTableMeta(pCxt, &sToken));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
}
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
SParsedDataColInfo spd = {0};
setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_LP == sToken.type) {
CHECK_CODE_1(parseBoundColumns(pCxt, &spd, pTagsSchema), destroyBoundColumnInfo(&spd));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_TAGS != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
}
// pSql -> (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision));
return TSDB_CODE_SUCCESS;
}
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
SMemRowBuilder* pBuilder = &pDataBlocks->rowBuilder;
char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
bool isParseBindParam = false;
SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta);
SMemParam param = {.row = row};
SToken sToken = {0};
// 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN(pCxt->pSql, sToken);
// todo bind param
SSchema *pSchema = &schema[spd->boundedColumns[i]];
param.schema = pSchema;
param.compareStat = pBuilder->compareStat;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset);
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param));
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
TSKEY tsKey = memRowKey(row);
if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) {
buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
}
if (!isParseBindParam) {
// 2. check and set convert flag
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen);
}
// 3. set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
SDataRow dataRow = memRowDataBody(row);
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) {
tdAppendDataColVal(dataRow, getNullValue(schema[i].type), true, schema[i].type, spd->cols[i].toffset);
}
}
}
}
*len = getExtendedRowSize(pDataBlocks);
return TSDB_CODE_SUCCESS;
}
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen));
(*numOfRows) = 0;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
SToken sToken;
while (1) {
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_LP != sToken.type) {
break;
}
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
int32_t tSize;
CHECK_CODE(allocateMemIfNeed(pDataBlock, extendedRowSize, &tSize));
ASSERT(tSize >= maxRows);
maxRows = tSize;
}
int32_t len = 0;
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf));
pDataBlock->size += len;
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
}
(*numOfRows)++;
}
if (0 == (*numOfRows)) {
return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
}
return TSDB_CODE_SUCCESS;
}
static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* dataBuf) {
int32_t maxNumOfRows;
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
int32_t numOfRows = 0;
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
SParamInfo *param = dataBuf->params + i;
if (param->idx == -1) {
// param->idx = pInsertParam->numOfParams++;
param->offset -= sizeof(SSubmitBlk);
}
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
}
dataBuf->numOfTables = 1;
pCxt->totalNum += numOfRows;
return TSDB_CODE_SUCCESS;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
while (1) {
SToken sToken;
// pSql -> tb_name ...
NEXT_TOKEN(pCxt->pSql, sToken);
// no data in the sql string anymore.
if (sToken.n == 0) {
if (0 == pCxt->totalNum) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
break;
}
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
// USING cluase
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(getTableMeta(pCxt, &sToken));
}
STableDataBlocks *dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, NULL/* tbname */, pCxt->pTableMeta, &dataBuf, NULL));
if (TK_LP == sToken.type) {
// pSql -> field1_name, ...)
CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(parseValuesClause(pCxt, dataBuf));
continue;
}
// FILE csv_file_path
if (TK_FILE == sToken.type) {
// pSql -> csv_file_path
NEXT_TOKEN(pCxt->pSql, sToken);
if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
}
// todo
continue;
}
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
}
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, true));
}
return TSDB_CODE_SUCCESS;
}
// INSERT INTO
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
CHECK_CODE(createInsertStmtInfo(pInfo));
SInsertParseContext context = {
.pComCxt = pContext,
.pSql = pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pCatalog = getCatalogHandle(pContext->pEpSet),
.pTableMeta = NULL,
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.totalNum = 0,
.pOutput = *pInfo
};
if (NULL == context.pTableBlockHashObj) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
CHECK_CODE(skipInsertInto(&context));
CHECK_CODE(parseInsertBody(&context));
return TSDB_CODE_SUCCESS;
}
......@@ -18,6 +18,7 @@
#include "parserUtil.h"
#include "ttoken.h"
#include "function.h"
#include "insertParser.h"
bool qIsInsertSql(const char* pStr, size_t length) {
int32_t index = 0;
......@@ -46,8 +47,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
}
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen) {
return 0;
int32_t qParseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
return parseInsertSql(pContext, pInfo);
}
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
......@@ -173,7 +174,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
assert(t != NULL);
if (t->n >= TSDB_FUNC_NAME_LEN) {
return buildSyntaxErrMsg(msg, msgBufLen, "too long function name", t->z);
return buildSyntaxErrMsg(&msgBuf, "too long function name", t->z);
}
// Let's assume that it is an UDF/UDAF, if it is not a built-in function.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "parserUtil.h"
#include "taosmsg.h"
#include "parser.h"
#include "taoserror.h"
......@@ -18,7 +34,6 @@ typedef struct STableFilterCond {
static STableMetaInfo* addTableMetaInfo(SQueryStmtInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMeta* tableMetaDup(STableMeta* pTableMeta);
int32_t parserValidateIdToken(SToken* pToken) {
if (pToken == NULL || pToken->z == NULL || pToken->type != TK_ID) {
......@@ -87,7 +102,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr) {
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
const char* msgFormat1 = "syntax error near \'%s\'";
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
const char* msgFormat3 = "%s";
......@@ -95,7 +110,7 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
const char* prefix = "syntax error";
if (sourceStr == NULL) {
assert(additionalInfo != NULL);
snprintf(dst, dstBufLen, msgFormat1, additionalInfo);
snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo);
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
......@@ -103,10 +118,10 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
strncpy(buf, sourceStr, tListLen(buf) - 1);
if (additionalInfo != NULL) {
snprintf(dst, dstBufLen, msgFormat2, buf, additionalInfo);
snprintf(pBuf->buf, pBuf->len, msgFormat2, buf, additionalInfo);
} else {
const char* msgFormat = (0 == strncmp(sourceStr, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1;
snprintf(dst, dstBufLen, msgFormat, buf);
snprintf(pBuf->buf, pBuf->len, msgFormat, buf);
}
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
......@@ -1490,7 +1505,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
return pTableMeta;
}
uint32_t getTableMetaSize(STableMeta* pTableMeta) {
uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
int32_t totalCols = 0;
......@@ -1505,7 +1520,7 @@ uint32_t getTableMetaMaxSize() {
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
}
STableMeta* tableMetaDup(STableMeta* pTableMeta) {
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
size_t size = getTableMetaSize(pTableMeta);
......
......@@ -284,7 +284,7 @@ static int32_t tKeywordCode(const char* z, int n) {
* Return the length of the token that begins at z[0].
* Store the token type in *type before returning.
*/
uint32_t tGetToken(char* z, uint32_t* tokenId) {
uint32_t tGetToken(const char* z, uint32_t* tokenId) {
uint32_t i;
switch (*z) {
case ' ':
......@@ -595,7 +595,7 @@ SToken tscReplaceStrToken(char **str, SToken *token, const char* newToken) {
return ntoken;
}
SToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
SToken t0 = {0};
// here we reach the end of sql string, null-terminated string
......@@ -689,13 +689,12 @@ void taosCleanupKeywordsTable() {
}
SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) {
assert(pToken != NULL && buf != NULL);
assert(pToken != NULL && buf != NULL && len > pToken->n);
strncpy(buf, pToken->z, pToken->n);
buf[pToken->n] = 0;
SToken token = *pToken;
token.z = buf;
assert(len > token.n);
strncpy(token.z, pToken->z, pToken->n);
token.z[token.n] = 0;
return token;
}
......@@ -37,7 +37,7 @@ char *taosCharsetReplace(char *charsetstr) {
return strdup(charsetstr);
}
int64_t taosStr2int64(char *str) {
int64_t taosStr2int64(const char *str) {
char *endptr = NULL;
return strtoll(str, &endptr, 10);
}
......@@ -107,7 +107,7 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) {
return len;
}
bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
memset(ucs4, 0, ucs4_max_len);
mbstate_t state = {0};
int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state);
......
......@@ -166,7 +166,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
return split;
}
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote) {
for (int32_t i = 0; i < len; ++i) {
// skip the needle in quote, jump to the end of quoted string
......@@ -179,7 +179,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
}
if (haystack[i] == needle) {
return &haystack[i];
return (char *)&haystack[i];
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册