From 77b8bdd56b5933518c1894e7cfa859a874c94a26 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 11 Dec 2021 00:35:26 -0500 Subject: [PATCH] TD-11819 Parsing insert statement and assembling binary objects. --- include/common/tdataformat.h | 2 +- source/libs/parser/inc/dataBlockMgt.h | 33 +- source/libs/parser/src/dataBlockMgt.c | 39 +- source/libs/parser/src/insertParser.c | 836 +++++++------------------- 4 files changed, 253 insertions(+), 657 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index a3de2452ac..c6ef6c513f 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -544,7 +544,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder); void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); -static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { +static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) { if (pBuilder->nCols >= pBuilder->tCols) { pBuilder->tCols *= 2; SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index d163d0b741..350610ec06 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -42,7 +42,7 @@ typedef enum ERowCompareStat { typedef struct SBoundColumn { int32_t offset; // all column offset value int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future - uint8_t valStat; // denote if current column bound or not(0 means has val, 1 means no val) + uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val) } SBoundColumn; typedef struct { @@ -63,15 +63,7 @@ typedef struct SParsedDataColInfo { int8_t orderStatus; // bound columns } SParsedDataColInfo; -typedef struct SParamInfo { - int32_t idx; - uint8_t type; - uint8_t timePrec; - int16_t bytes; - uint32_t offset; -} SParamInfo; - -typedef struct { +typedef struct SMemRowInfo { int32_t dataLen; // len of SDataRow int32_t kvLen; // len of SKVRow } SMemRowInfo; @@ -83,15 +75,13 @@ typedef struct { SMemRowInfo *rowInfo; } SMemRowBuilder; -typedef struct SBlockKeyTuple { - TSKEY skey; - void* payloadAddr; -} SBlockKeyTuple; - -typedef struct SBlockKeyInfo { - int32_t maxBytesAlloc; - SBlockKeyTuple* pKeyTuple; -} SBlockKeyInfo; +typedef struct SParamInfo { + int32_t idx; + uint8_t type; + uint8_t timePrec; + int16_t bytes; + uint32_t offset; +} SParamInfo; typedef struct STableDataBlocks { SName tableName; @@ -147,7 +137,7 @@ static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bo } static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd, - int32_t idx, int32_t *toffset, int16_t *colId) { + int32_t idx, int32_t *toffset) { int32_t schemaIdx = 0; if (IS_DATA_COL_ORDERED(spd)) { schemaIdx = spd->boundedColumns[idx]; @@ -165,10 +155,9 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx); } } - *colId = pSchema[schemaIdx].colId; } -static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { +static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { if (isDataRow(row)) { if (kvLen < (dataLen * KVRatioConvert)) { memRowSetConvert(row); diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index e8bf9da793..4ece848888 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -15,20 +15,24 @@ #include "dataBlockMgt.h" -// #include "astGenerator.h" -// #include "parserInt.h" #include "catalog.h" #include "parserUtil.h" #include "queryInfoUtil.h" -// #include "ttoken.h" -// #include "function.h" -// #include "ttime.h" -// #include "tglobal.h" #include "taosmsg.h" #define IS_RAW_PAYLOAD(t) \ (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert +typedef struct SBlockKeyTuple { + TSKEY skey; + void* payloadAddr; +} SBlockKeyTuple; + +typedef struct SBlockKeyInfo { + int32_t maxBytesAlloc; + SBlockKeyTuple* pKeyTuple; +} SBlockKeyInfo; + static int32_t rowDataCompar(const void *lhs, const void *rhs) { TSKEY left = *(TSKEY *)lhs; TSKEY right = *(TSKEY *)rhs; @@ -189,8 +193,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { * TODO: Move to tdataformat.h and refactor when STSchema available. * - fetch flen and toffset from STSChema and remove param spd */ -static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, - SParsedDataColInfo *spd) { +static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, SParsedDataColInfo *spd) { ASSERT(isKvRow(src)); SKVRow kvRow = memRowKvBody(src); SDataRow dataRow = memRowDataBody(dest); @@ -209,8 +212,7 @@ static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *p } // TODO: Move to tdataformat.h and refactor when STSchema available. -static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, - SParsedDataColInfo *spd) { +static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, SParsedDataColInfo *spd) { ASSERT(isDataRow(src)); SDataRow dataRow = memRowDataBody(src); @@ -485,22 +487,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB } static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) { - // pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); - // if (pInsertParam->pTableNameList == NULL) { - // pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); - // } - - // STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); - // int32_t i = 0; - // while(p1) { - // STableDataBlocks* pBlocks = *p1; - // pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); - // p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); - // } - - // if (freeBlockMap) { - // pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false); - // } + // todo } int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) { diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 1748588427..0d9d6d7580 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -15,7 +15,6 @@ #include "insertParser.h" -// #include "astGenerator.h" #include "dataBlockMgt.h" #include "parserInt.h" #include "parserUtil.h" @@ -23,7 +22,6 @@ #include "tglobal.h" #include "ttime.h" #include "ttoken.h" -// #include "function.h" #include "ttypes.h" #define NEXT_TOKEN(pSql, sToken) \ @@ -308,518 +306,249 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec return TSDB_CODE_SUCCESS; } -static int32_t parseOneColumn(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, bool primaryKey, int16_t timePrec, char* payload) { - int64_t iv; - int32_t ret; - char *endptr = NULL; - - if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); +typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param); + +typedef struct SKvParam { + char buf[TSDB_MAX_TAGS_LEN]; + SKVRowBuilder* builder; + SSchema* schema; +} SKvParam; + +static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) { + SKvParam* pa = (SKvParam*)param; + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { + STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len); + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf); + } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + } + varDataSetLen(pa->buf, output); + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf); + } else { + tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, value); } + return TSDB_CODE_SUCCESS; +} - switch (pSchema->type) { - case TSDB_DATA_TYPE_BOOL: { - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; - } else { - if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { - if (strncmp(pToken->z, "true", pToken->n) == 0) { - *(uint8_t *)payload = TSDB_TRUE; - } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - *(uint8_t *)payload = TSDB_FALSE; - } else { - return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); - } - } else if (pToken->type == TK_INTEGER) { - iv = strtoll(pToken->z, NULL, 10); - *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); - } else if (pToken->type == TK_FLOAT) { - double dv = strtod(pToken->z, NULL); - *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); - } else { - return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); - } - } - break; +typedef struct SMemParam { + SMemRow row; + SSchema* schema; + int32_t toffset; + uint8_t compareStat; + int32_t dataLen; + int32_t kvLen; +} SMemParam; + +static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *param) { + SMemParam* pa = (SMemParam*)param; + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { + char *rowEnd = memRowEnd(pa->row); + STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); + appendMemRowColValEx(pa->row, rowEnd, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + char * rowEnd = memRowEnd(pa->row); + if (!taosMbsToUcs4(value, len, (char *)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } - case TSDB_DATA_TYPE_TINYINT: - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); - } else if (!IS_VALID_TINYINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z); - } - *((uint8_t *)payload) = (uint8_t)iv; - } - break; - case TSDB_DATA_TYPE_UTINYINT: - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); - } else if (!IS_VALID_UTINYINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); - } - *((uint8_t *)payload) = (uint8_t)iv; - } - break; - case TSDB_DATA_TYPE_SMALLINT: - if (isNullStr(pToken)) { - *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); - } else if (!IS_VALID_SMALLINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); - } - *((int16_t *)payload) = (int16_t)iv; - } - break; - case TSDB_DATA_TYPE_USMALLINT: - if (isNullStr(pToken)) { - *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); - } else if (!IS_VALID_USMALLINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); - } - *((uint16_t *)payload) = (uint16_t)iv; - } - break; - case TSDB_DATA_TYPE_INT: - if (isNullStr(pToken)) { - *((int32_t *)payload) = TSDB_DATA_INT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); - } else if (!IS_VALID_INT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); - } - - *((int32_t *)payload) = (int32_t)iv; - } - - break; - - case TSDB_DATA_TYPE_UINT: - if (isNullStr(pToken)) { - *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); - } else if (!IS_VALID_UINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); - } - - *((uint32_t *)payload) = (uint32_t)iv; - } - - break; - - case TSDB_DATA_TYPE_BIGINT: - if (isNullStr(pToken)) { - *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); - } else if (!IS_VALID_BIGINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z); - } - - *((int64_t *)payload) = iv; - } - break; - - case TSDB_DATA_TYPE_UBIGINT: - if (isNullStr(pToken)) { - *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); - } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z); - } - - *((uint64_t *)payload) = iv; - } - break; - - case TSDB_DATA_TYPE_FLOAT: - if (isNullStr(pToken)) { - *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; - } else { - double dv; - if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); - } - - SET_FLOAT_VAL(payload, dv); - } - break; - - case TSDB_DATA_TYPE_DOUBLE: - if (isNullStr(pToken)) { - *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; - } else { - double dv; - if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); - } - - *((double *)payload) = dv; - } - break; - - case TSDB_DATA_TYPE_BINARY: - // binary data cannot be null-terminated char string, otherwise the last char of the string is lost - if (pToken->type == TK_NULL) { - setVardataNull(payload, TSDB_DATA_TYPE_BINARY); - } else { // too long values will return invalid sql, not be truncated automatically - if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor - return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z); - } - - STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); - } + varDataSetLen(rowEnd, output); + appendMemRowColValEx(pa->row, rowEnd, false, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } else { + appendMemRowColValEx(pa->row, value, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat); + } + return TSDB_CODE_SUCCESS; +} - break; +static FORCE_INLINE int32_t checkAndTrimValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, char* tmpTokenBuf) { + int16_t type = pToken->type; + if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && + type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || + (pToken->n == 0) || (type == TK_RP)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pToken->z); + } - case TSDB_DATA_TYPE_NCHAR: - if (pToken->type == TK_NULL) { - setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); - } else { - // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' - int32_t output = 0; - if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { - char buf[512] = {0}; - snprintf(buf, tListLen(buf), "%s", strerror(errno)); - return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z); - } - - varDataSetLen(payload, output); - } - break; + if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); + } - case TSDB_DATA_TYPE_TIMESTAMP: { - if (pToken->type == TK_NULL) { - if (primaryKey) { - *((int64_t *)payload) = 0; - } else { - *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; - } - } else { - int64_t temp; - if (parseTime(pCxt, pToken, timePrec, &temp) != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); - } - - *((int64_t *)payload) = temp; + // Remove quotation marks + if (TK_STRING == type) { + if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { + return buildSyntaxErrMsg(&pCxt->msg, "too long string", pToken->z); + } + // delete escape character: \\, \', \" + char delim = pToken->z[0]; + int32_t cnt = 0; + int32_t j = 0; + for (uint32_t k = 1; k < pToken->n - 1; ++k) { + if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) { + tmpTokenBuf[j] = pToken->z[k + 1]; + cnt++; + j++; + k++; + continue; } - - break; + tmpTokenBuf[j] = pToken->z[k]; + j++; } + tmpTokenBuf[j] = 0; + pToken->z = tmpTokenBuf; + pToken->n -= 2 + cnt; } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t parseOneColumnKV(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, bool primaryKey, int16_t timePrec, - SMemRow row, int32_t toffset, int16_t colId, int32_t* dataLen, int32_t* kvLen, uint8_t compareStat) { +static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, FRowAppend func, void* param) { int64_t iv; int32_t ret; char * endptr = NULL; - if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); + CHECK_CODE(checkAndTrimValue(pCxt, pToken, pSchema, tmpTokenBuf)); + + if (isNullStr(pToken)) { + if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { + int64_t tmpVal = 0; + return func(&tmpVal, pSchema->bytes, param); + } + return func(getNullValue(pSchema->type), 0, param); } switch (pSchema->type) { - case TSDB_DATA_TYPE_BOOL: { // bool - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { - if (strncmp(pToken->z, "true", pToken->n) == 0) { - appendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); - } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - appendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); - } - } else if (pToken->type == TK_INTEGER) { - iv = strtoll(pToken->z, NULL, 10); - appendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, - dataLen, kvLen, compareStat); - } else if (pToken->type == TK_FLOAT) { - double dv = strtod(pToken->z, NULL); - appendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, - dataLen, kvLen, compareStat); + case TSDB_DATA_TYPE_BOOL: { + if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { + if (strncmp(pToken->z, "true", pToken->n) == 0) { + return func(&TRUE_VALUE, pSchema->bytes, param); + } else if (strncmp(pToken->z, "false", pToken->n) == 0) { + return func(&FALSE_VALUE, pSchema->bytes, param); } else { return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } + } else if (pToken->type == TK_INTEGER) { + return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + } else if (pToken->type == TK_FLOAT) { + return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + } else { + return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } break; } - - case TSDB_DATA_TYPE_TINYINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); - } else if (!IS_VALID_TINYINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z); - } - - uint8_t tmpVal = (uint8_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + case TSDB_DATA_TYPE_TINYINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); + } else if (!IS_VALID_TINYINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_UTINYINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); - } else if (!IS_VALID_UTINYINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); - } - - uint8_t tmpVal = (uint8_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + uint8_t tmpVal = (uint8_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UTINYINT:{ + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); + } else if (!IS_VALID_UTINYINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_SMALLINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); - } else if (!IS_VALID_SMALLINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); - } - - int16_t tmpVal = (int16_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + uint8_t tmpVal = (uint8_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_SMALLINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); + } else if (!IS_VALID_SMALLINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_USMALLINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); - } else if (!IS_VALID_USMALLINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); - } - - uint16_t tmpVal = (uint16_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + int16_t tmpVal = (int16_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_USMALLINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); + } else if (!IS_VALID_USMALLINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_INT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); - } else if (!IS_VALID_INT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); - } - - int32_t tmpVal = (int32_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + uint16_t tmpVal = (uint16_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_INT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); + } else if (!IS_VALID_INT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_UINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); - } else if (!IS_VALID_UINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); - } - - uint32_t tmpVal = (uint32_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + int32_t tmpVal = (int32_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); + } else if (!IS_VALID_UINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); } - - break; - - case TSDB_DATA_TYPE_BIGINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); - } else if (!IS_VALID_BIGINT(iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z); - } - - appendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + uint32_t tmpVal = (uint32_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_BIGINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); + } else if (!IS_VALID_BIGINT(iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z); } - break; - - case TSDB_DATA_TYPE_UBIGINT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); - } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { - return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z); - } - - uint64_t tmpVal = (uint64_t)iv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + return func(&iv, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_UBIGINT: { + if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); + } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { + return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z); } - break; - - case TSDB_DATA_TYPE_FLOAT: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - double dv; - if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || - isnan(dv)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); - } - - float tmpVal = (float)dv; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + uint64_t tmpVal = (uint64_t)iv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_FLOAT: { + double dv; + if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); } - break; - - case TSDB_DATA_TYPE_DOUBLE: - if (isNullStr(pToken)) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - double dv; - if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { - return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); - } - - appendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); } - break; - - case TSDB_DATA_TYPE_BINARY: - // binary data cannot be null-terminated char string, otherwise the last char of the string is lost - if (pToken->type == TK_NULL) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { // too long values will return invalid sql, not be truncated automatically - if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor - return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z); - } - // STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); - char *rowEnd = memRowEnd(row); - STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n); - appendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + float tmpVal = (float)dv; + return func(&tmpVal, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_DOUBLE: { + double dv; + if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); } - break; - - case TSDB_DATA_TYPE_NCHAR: - if (pToken->type == TK_NULL) { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } else { - // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' - int32_t output = 0; - char * rowEnd = memRowEnd(row); - if (!taosMbsToUcs4(pToken->z, pToken->n, (char *)varDataVal(rowEnd), pSchema->bytes - VARSTR_HEADER_SIZE, - &output)) { - char buf[512] = {0}; - snprintf(buf, tListLen(buf), "%s", strerror(errno)); - return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z); - } - varDataSetLen(rowEnd, output); - appendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { + return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); } - break; - + return func(&dv, pSchema->bytes, param); + } + case TSDB_DATA_TYPE_BINARY: { + // too long values will return invalid sql, not be truncated automatically + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z); + } + return func(pToken->z, pToken->n, param); + } + case TSDB_DATA_TYPE_NCHAR: { + return func(pToken->z, pToken->n, param); + } case TSDB_DATA_TYPE_TIMESTAMP: { - if (pToken->type == TK_NULL) { - if (primaryKey) { - // When building SKVRow primaryKey, we should not skip even with NULL value. - int64_t tmpVal = 0; - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); - } else { - appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); - } - } else { - int64_t tmpVal; - if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); - } - appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + int64_t tmpVal; + if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); } - - break; + return func(&tmpVal, pSchema->bytes, param); } } - return TSDB_CODE_SUCCESS; + return TSDB_CODE_FAILED; } // pSql -> tag1_name, ...) @@ -892,35 +621,17 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pS return TSDB_CODE_TSC_OUT_OF_MEMORY; } + SKvParam param = {.builder = &kvRowBuilder}; SToken sToken; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" for (int i = 0; i < pSpd->numOfBound; ++i) { - SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]]; - NEXT_TOKEN(pCxt->pSql, sToken); - - if (TK_ILLEGAL == sToken.type) { - tdDestroyKVRowBuilder(&kvRowBuilder); - destroyBoundColumnInfo(pSpd); - return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; - } - - if (sToken.n == 0 || sToken.type == TK_RP) { - break; - } - - // Remove quotation marks - if (TK_STRING == sToken.type) { - sToken.z++; - sToken.n -= 2; - } - - char tagVal[TSDB_MAX_TAGS_LEN]; - CHECK_CODE_2(parseOneColumn(pCxt, &sToken, pSchema, false, precision, tagVal), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd)); - tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); + SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]]; + param.schema = pSchema; + CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd)); } - - destroyBoundColumnInfo(pSpd); + destroyBoundColumnInfo(pSpd); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (NULL == row) { @@ -928,19 +639,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pS } tdSortKVRowByColIdx(row); - // pInsertParam->tagData.dataLen = kvRowLen(row); - // if (pInsertParam->tagData.dataLen <= 0){ - // return buildInvalidOperationMsg(msgBuf, "tag value expected"); - // } - - // char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen); - // if (pTag == NULL) { - // return TSDB_CODE_TSC_OUT_OF_MEMORY; - // } + // todo construct payload - // kvRowCpy(pTag, row); tfree(row); - // pInsertParam->tagData.data = pTag; } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) @@ -978,99 +679,27 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) return TSDB_CODE_SUCCESS; } -static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { - int32_t index = 0; - SToken sToken = {0}; - +static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { + SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo; + SMemRowBuilder* pBuilder = &pDataBlocks->rowBuilder; char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header - - SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; - STableMeta * pTableMeta = pDataBlocks->pTableMeta; - SSchema * schema = getTableColumnSchema(pTableMeta); - SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder; - int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE; - int32_t kvLen = pBuilder->kvRowInitLen; - bool isParseBindParam = false; - initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); + bool isParseBindParam = false; + SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta); + SMemParam param = {.row = row}; + SToken sToken = {0}; // 1. set the parsed value from sql string for (int i = 0; i < spd->numOfBound; ++i) { - // the start position in data block buffer of current value in sql - int32_t colIndex = spd->boundedColumns[i]; - - char *start = row + spd->cols[colIndex].offset; - - SSchema *pSchema = &schema[colIndex]; // get colId here - NEXT_TOKEN(pCxt->pSql, sToken); - - // if (sToken.type == TK_QUESTION) { - // if (!isParseBindParam) { - // isParseBindParam = true; - // } - // if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { - // return buildSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str); - // } - - // uint32_t offset = (uint32_t)(start - pDataBlocks->pData); - // if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { - // continue; - // } - - // strcpy(pInsertParam->msg, "client out of memory"); - // return TSDB_CODE_TSC_OUT_OF_MEMORY; - // } - - int16_t type = sToken.type; - if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && - type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || - (sToken.n == 0) || (type == TK_RP)) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", sToken.z); - } - - // Remove quotation marks - if (TK_STRING == sToken.type) { - // delete escape character: \\, \', \" - char delim = sToken.z[0]; - - int32_t cnt = 0; - int32_t j = 0; - if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) { - return buildSyntaxErrMsg(&pCxt->msg, "too long string", sToken.z); - } - - for (uint32_t k = 1; k < sToken.n - 1; ++k) { - if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) { - tmpTokenBuf[j] = sToken.z[k + 1]; - - cnt++; - j++; - k++; - continue; - } - - tmpTokenBuf[j] = sToken.z[k]; - j++; - } - - tmpTokenBuf[j] = 0; - sToken.z = tmpTokenBuf; - sToken.n -= 2 + cnt; - } - - bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_ID); - int32_t toffset = -1; - int16_t colId = -1; - getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId); - - int32_t ret = parseOneColumnKV(pCxt, &sToken, pSchema, isPrimaryKey, timePrec, row, toffset, - colId, &dataLen, &kvLen, pBuilder->compareStat); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - if (isPrimaryKey) { + // todo bind param + SSchema *pSchema = &schema[spd->boundedColumns[i]]; + param.schema = pSchema; + param.compareStat = pBuilder->compareStat; + getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, ¶m.toffset); + CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, ¶m)); + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { TSKEY tsKey = memRowKey(row); if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) { buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); @@ -1082,7 +711,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, if (!isParseBindParam) { // 2. check and set convert flag if (pBuilder->compareStat == ROW_COMPARE_NEED) { - checkAndConvertMemRow(row, dataLen, kvLen); + convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen); } // 3. set the null value for the columns that do not assign values @@ -1097,17 +726,17 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, } *len = getExtendedRowSize(pDataBlocks); - return TSDB_CODE_SUCCESS; } // pSql -> (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows, char* tmpTokenBuf) { +static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock); CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen)); (*numOfRows) = 0; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" SToken sToken; while (1) { NEXT_TOKEN(pCxt->pSql, sToken); @@ -1145,8 +774,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); int32_t numOfRows = 0; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" - CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows, tmpTokenBuf)); + CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows)); for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) { SParamInfo *param = dataBuf->params + i; @@ -1186,10 +814,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } SToken tbnameToken = sToken; - NEXT_TOKEN(pCxt->pSql, sToken); - // USING cluase + // USING cluase if (TK_USING == sToken.type) { CHECK_CODE(parseUsingClause(pCxt, &tbnameToken)); NEXT_TOKEN(pCxt->pSql, sToken); @@ -1203,12 +830,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TK_LP == sToken.type) { // pSql -> field1_name, ...) - NEXT_TOKEN(pCxt->pSql, sToken); - // todo col_list - NEXT_TOKEN(pCxt->pSql, sToken); - if (TK_RP != sToken.type) { - return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z); - } + CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -1222,11 +844,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TK_FILE == sToken.type) { // pSql -> csv_file_path NEXT_TOKEN(pCxt->pSql, sToken); - if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); } - // todo continue; } -- GitLab