提交 1f4c2cf5 编写于 作者: H Haojun Liao

[td-225]refactor.

上级 b74d5b72
...@@ -36,19 +36,6 @@ extern "C" { ...@@ -36,19 +36,6 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\ #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
typedef struct SParsedColElem {
int16_t colIndex;
uint16_t offset;
} SParsedColElem;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo;
#pragma pack(push,1) #pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid // this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL, // an 'uid' whose low bytes is 0xff being recoginized as NULL,
...@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
......
...@@ -175,6 +175,19 @@ typedef struct SParamInfo { ...@@ -175,6 +175,19 @@ typedef struct SParamInfo {
uint32_t offset; uint32_t offset;
} SParamInfo; } SParamInfo;
typedef struct SBoundColumn {
bool hasVal; // denote if current column has bound or not
int32_t offset; // all column offset value
} SBoundColumn;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
int32_t *boundedColumns;
SBoundColumn *cols;
} SParsedDataColInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
SName tableName; SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
...@@ -189,6 +202,8 @@ typedef struct STableDataBlocks { ...@@ -189,6 +202,8 @@ typedef struct STableDataBlocks {
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData; char *pData;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding // for parameter ('?') binding
uint32_t numOfAllocedParams; uint32_t numOfAllocedParams;
uint32_t numOfParams; uint32_t numOfParams;
...@@ -462,6 +477,7 @@ char* tscGetSqlStr(SSqlObj* pSql); ...@@ -462,6 +477,7 @@ char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql); bool tscIsQueryWithLimit(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
......
...@@ -40,6 +40,7 @@ enum { ...@@ -40,6 +40,7 @@ enum {
}; };
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema, char* str, char** end);
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) { static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
errno = 0; errno = 0;
...@@ -94,12 +95,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1 ...@@ -94,12 +95,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
*/ */
SStrToken valueToken; SStrToken valueToken;
index = 0; index = 0;
sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL); sToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index; pTokenEnd += index;
if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) { if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
index = 0; index = 0;
valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL); valueToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index; pTokenEnd += index;
if (valueToken.n < 2) { if (valueToken.n < 2) {
...@@ -127,13 +128,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1 ...@@ -127,13 +128,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo extract the null value check
static bool isNullStr(SStrToken* pToken) { static bool isNullStr(SStrToken* pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) && return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)); (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
} }
int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey, int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
int16_t timePrec) { int16_t timePrec) {
int64_t iv; int64_t iv;
int32_t ret; int32_t ret;
char *endptr = NULL; char *endptr = NULL;
...@@ -417,29 +417,32 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start ...@@ -417,29 +417,32 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd, int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int16_t timePrec, int32_t *len,
int16_t timePrec, int32_t *code, char *tmpTokenBuf) { char *tmpTokenBuf) {
int32_t index = 0; int32_t index = 0;
SStrToken sToken = {0}; SStrToken sToken = {0};
char * payload = pDataBlocks->pData + pDataBlocks->size; char *payload = pDataBlocks->pData + pDataBlocks->size;
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
SSchema *schema = tscGetTableSchema(pDataBlocks->pTableMeta);
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
int32_t rowSize = 0; int32_t rowSize = 0;
for (int i = 0; i < spd->numOfAssignedCols; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql // the start position in data block buffer of current value in sql
char * start = payload + spd->elems[i].offset; int32_t colIndex = spd->boundedColumns[i];
int16_t colIndex = spd->elems[i].colIndex;
SSchema *pSchema = schema + colIndex; char *start = payload + spd->cols[colIndex].offset;
SSchema *pSchema = &schema[colIndex];
rowSize += pSchema->bytes; rowSize += pSchema->bytes;
index = 0; index = 0;
sToken = tStrGetToken(*str, &index, true, 0, NULL); sToken = tStrGetToken(*str, &index, true);
*str += index; *str += index;
if (sToken.type == TK_QUESTION) { if (sToken.type == TK_QUESTION) {
if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
*code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str); return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
return -1;
} }
uint32_t offset = (uint32_t)(start - pDataBlocks->pData); uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
...@@ -448,15 +451,13 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ ...@@ -448,15 +451,13 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
} }
strcpy(pCmd->payload, "client out of memory"); strcpy(pCmd->payload, "client out of memory");
*code = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
} }
int16_t type = sToken.type; int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) { type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
*code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z); return tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
return -1;
} }
// Remove quotation marks // Remove quotation marks
...@@ -485,26 +486,23 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ ...@@ -485,26 +486,23 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
} }
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec); int32_t ret = tsParseOneColumn(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return ret;
return -1; // NOTE: here 0 mean error!
} }
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z); tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
*code = TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
return -1;
} }
} }
// 2. set the null value for the columns that do not assign values // 2. set the null value for the columns that do not assign values
if (spd->numOfAssignedCols < spd->numOfCols) { if (spd->numOfBound < spd->numOfCols) {
char *ptr = payload; char *ptr = payload;
for (int32_t i = 0; i < spd->numOfCols; ++i) { for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null
if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null
if (schema[i].type == TSDB_DATA_TYPE_BINARY) { if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(ptr, sizeof(int8_t)); varDataSetLen(ptr, sizeof(int8_t));
*(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL; *(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
...@@ -522,7 +520,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ ...@@ -522,7 +520,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
rowSize = (int32_t)(ptr - payload); rowSize = (int32_t)(ptr - payload);
} }
return rowSize; *len = rowSize;
return TSDB_CODE_SUCCESS;
} }
static int32_t rowDataCompar(const void *lhs, const void *rhs) { static int32_t rowDataCompar(const void *lhs, const void *rhs) {
...@@ -536,80 +535,79 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { ...@@ -536,80 +535,79 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
} }
} }
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows, int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SSqlCmd* pCmd, int32_t* numOfRows, char *tmpTokenBuf) {
SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) { int32_t index = 0;
int32_t index = 0; int32_t code = 0;
SStrToken sToken;
int32_t numOfRows = 0; (*numOfRows) = 0;
SSchema *pSchema = tscGetTableSchema(pTableMeta); SStrToken sToken;
STableMeta* pTableMeta = pDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
int32_t precision = tinfo.precision; int32_t precision = tinfo.precision;
if (spd->hasVal[0] == false) {
*code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str);
return -1;
}
while (1) { while (1) {
index = 0; index = 0;
sToken = tStrGetToken(*str, &index, false, 0, NULL); sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break; if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += index; *str += index;
if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) { if ((*numOfRows) >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
int32_t tSize; int32_t tSize;
*code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
if (*code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(pCmd->payload, "client out of memory"); strcpy(pCmd->payload, "client out of memory");
return -1; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
ASSERT(tSize > maxRows); ASSERT(tSize > maxRows);
maxRows = tSize; maxRows = tSize;
} }
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf); int32_t len = 0;
if (len <= 0) { // error message has been set in tsParseOneRowData code = tsParseOneRow(str, pDataBlock, pCmd, precision, &len, tmpTokenBuf);
return -1; if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
pDataBlock->size += len; pDataBlock->size += len;
index = 0; index = 0;
sToken = tStrGetToken(*str, &index, false, 0, NULL); sToken = tStrGetToken(*str, &index, false);
*str += index; *str += index;
if (sToken.n == 0 || sToken.type != TK_RP) { if (sToken.n == 0 || sToken.type != TK_RP) {
tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str); tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; return -1;
} }
numOfRows++; (*numOfRows)++;
} }
if (numOfRows <= 0) { if ((*numOfRows) <= 0) {
strcpy(pCmd->payload, "no any data points"); strcpy(pCmd->payload, "no any data points");
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1;
} else { } else {
return numOfRows; return TSDB_CODE_SUCCESS;
} }
} }
static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t numOfCols) { void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
spd->numOfCols = numOfCols; pColInfo->numOfCols = numOfCols;
spd->numOfAssignedCols = numOfCols; pColInfo->numOfBound = numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
spd->hasVal[i] = true; pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
spd->elems[i].colIndex = i;
for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
if (i > 0) { if (i > 0) {
spd->elems[i].offset = spd->elems[i - 1].offset + pSchema[i - 1].bytes; pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset;
} }
pColInfo->cols[i].hasVal = true;
pColInfo->boundedColumns[i] = i;
} }
} }
...@@ -697,33 +695,26 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { ...@@ -697,33 +695,26 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
} }
} }
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) { static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t maxNumOfRows; int32_t maxNumOfRows;
ret = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows); int32_t code = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
int32_t code = TSDB_CODE_TSC_INVALID_SQL; code = TSDB_CODE_TSC_INVALID_SQL;
char * tmpTokenBuf = calloc(1, 16*1024); // used for deleting Escape character: \\, \', \" char *tmpTokenBuf = calloc(1, 16*1024); // used for deleting Escape character: \\, \', \"
if (NULL == tmpTokenBuf) { if (NULL == tmpTokenBuf) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf); int32_t numOfRows = 0;
code = tsParseValues(str, dataBuf, maxNumOfRows, pCmd, &numOfRows, tmpTokenBuf);
free(tmpTokenBuf); free(tmpTokenBuf);
if (numOfRows <= 0) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -736,25 +727,23 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI ...@@ -736,25 +727,23 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
} }
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); code = tsSetBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str); tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str);
return code; return code;
} }
dataBuf->vgId = pTableMeta->vgId;
dataBuf->numOfTables = 1; dataBuf->numOfTables = 1;
*totalNum += numOfRows; *totalNum += numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundColumn) {
int32_t index = 0; int32_t index = 0;
SStrToken sToken = {0}; SStrToken sToken = {0};
SStrToken tableToken = {0}; SStrToken tableToken = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const int32_t TABLE_INDEX = 0; const int32_t TABLE_INDEX = 0;
const int32_t STABLE_INDEX = 1; const int32_t STABLE_INDEX = 1;
...@@ -767,38 +756,37 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -767,38 +756,37 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
// get the token of specified table // get the token of specified table
index = 0; index = 0;
tableToken = tStrGetToken(sql, &index, false, 0, NULL); tableToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
char *cstart = NULL;
char *cend = NULL;
// skip possibly exists column list // skip possibly exists column list
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
int32_t numOfColList = 0; int32_t numOfColList = 0;
bool createTable = false;
// Bind table columns list in string, skip it and continue
if (sToken.type == TK_LP) { if (sToken.type == TK_LP) {
cstart = &sToken.z[0]; *boundColumn = &sToken.z[0];
index = 0;
while (1) { while (1) {
sToken = tStrGetToken(sql, &index, false, 0, NULL); index = 0;
sToken = tStrGetToken(sql, &index, false);
if (sToken.type == TK_RP) { if (sToken.type == TK_RP) {
cend = &sToken.z[0];
break; break;
} }
sql += index;
++numOfColList; ++numOfColList;
} }
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
} }
if (numOfColList == 0 && cstart != NULL) { if (numOfColList == 0 && (*boundColumn) != NULL) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -806,7 +794,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -806,7 +794,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
if (sToken.type == TK_USING) { // create table if not exists according to the super table if (sToken.type == TK_USING) { // create table if not exists according to the super table
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
//the source super table is moved to the secondary position of the pTableMetaInfo list //the source super table is moved to the secondary position of the pTableMetaInfo list
...@@ -835,82 +823,42 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -835,82 +823,42 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
SSchema *pTagSchema = tscGetTableTagSchema(pSTableMetaInfo->pTableMeta); SSchema *pTagSchema = tscGetTableTagSchema(pSTableMetaInfo->pTableMeta);
STableComInfo tinfo = tscGetTableInfo(pSTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pSTableMetaInfo->pTableMeta);
index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index;
SParsedDataColInfo spd = {0}; SParsedDataColInfo spd = {0};
tscSetBoundColumnInfo(&spd, pTagSchema, tscGetNumOfTags(pSTableMetaInfo->pTableMeta));
uint8_t numOfTags = tscGetNumOfTags(pSTableMetaInfo->pTableMeta);
spd.numOfCols = numOfTags;
// if specify some tags column
if (sToken.type != TK_LP) {
tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
} else {
/* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
* tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
int16_t offset[TSDB_MAX_COLUMNS] = {0};
for (int32_t t = 1; t < numOfTags; ++t) {
offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
}
while (1) {
index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index;
if (TK_STRING == sToken.type) {
strdequote(sToken.z);
sToken.n = (uint32_t)strtrim(sToken.z);
}
if (sToken.type == TK_RP) {
break;
}
bool findColumnIndex = false;
// todo speedup by using hash list
for (int32_t t = 0; t < numOfTags; ++t) {
if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) {
SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
pElem->offset = offset[t];
pElem->colIndex = t;
if (spd.hasVal[t] == true) { index = 0;
return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z); sToken = tStrGetToken(sql, &index, false);
} if (sToken.type != TK_TAGS && sToken.type != TK_LP) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
spd.hasVal[t] = true; }
findColumnIndex = true;
break;
}
}
if (!findColumnIndex) { // parse the bound tags column
return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken.z); if (sToken.type == TK_LP) {
} /*
* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
* tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn);
*/
char* end = NULL;
code = parseBoundColumns(pCmd, &spd, pTagSchema, sql, &end);
if (code != TSDB_CODE_SUCCESS) {
return code;
} }
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > numOfTags) { sql = end;
return tscInvalidSQLErrMsg(pCmd->payload, "tag name expected", sToken.z);
}
index = 0; index = 0; // keywords of "TAGS"
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index;
} else {
sql += index; sql += index;
}
if (sToken.type != TK_TAGS) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
} }
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
if (sToken.type != TK_LP) { if (sToken.type != TK_LP) {
return tscInvalidSQLErrMsg(pCmd->payload, NULL, sToken.z); return tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
} }
SKVRowBuilder kvRowBuilder = {0}; SKVRowBuilder kvRowBuilder = {0};
...@@ -918,13 +866,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -918,13 +866,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
uint32_t ignoreTokenTypes = TK_LP; for (int i = 0; i < spd.numOfBound; ++i) {
uint32_t numOfIgnoreToken = 1; SSchema* pSchema = &pTagSchema[spd.boundedColumns[i]];
for (int i = 0; i < spd.numOfAssignedCols; ++i) {
SSchema* pSchema = pTagSchema + spd.elems[i].colIndex;
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes); sToken = tStrGetToken(sql, &index, true);
sql += index; sql += index;
if (TK_ILLEGAL == sToken.type) { if (TK_ILLEGAL == sToken.type) {
...@@ -943,7 +889,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -943,7 +889,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
} }
char tagVal[TSDB_MAX_TAGS_LEN]; char tagVal[TSDB_MAX_TAGS_LEN];
code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision); code = tsParseOneColumn(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
return code; return code;
...@@ -952,6 +898,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -952,6 +898,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
tscDestroyBoundColumnInfo(&spd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) { if (row == NULL) {
...@@ -974,7 +922,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -974,7 +922,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
pCmd->tagData.data = pTag; pCmd->tagData.data = pTag;
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false);
sql += index; sql += index;
if (sToken.n == 0 || sToken.type != TK_RP) { if (sToken.n == 0 || sToken.type != TK_RP) {
return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z); return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
...@@ -989,33 +937,21 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -989,33 +937,21 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return ret; return ret;
} }
createTable = true;
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true); code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code; return code;
} }
} else { } else {
if (cstart != NULL) { sql = sToken.z;
sql = cstart;
} else {
sql = sToken.z;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false); code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
if (pCmd->curSql == NULL) { if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
} }
} }
int32_t len = (int32_t)(cend - cstart + 1); *sqlstr = sql;
if (cstart != NULL && createTable == true) {
/* move the column list to start position of the next accessed points */
memmove(sql - len, cstart, len);
*sqlstr = sql - len;
} else {
*sqlstr = sql;
}
if (*sqlstr == NULL) { if (*sqlstr == NULL) {
code = TSDB_CODE_TSC_INVALID_SQL; code = TSDB_CODE_TSC_INVALID_SQL;
...@@ -1043,6 +979,76 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) { ...@@ -1043,6 +979,76 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SSchema* pSchema,
char* str, char **end) {
pColInfo->numOfBound = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols);
for(int32_t i = 0; i < pColInfo->numOfCols; ++i) {
pColInfo->cols[i].hasVal = false;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0;
SStrToken sToken = tStrGetToken(str, &index, false);
str += index;
if (sToken.type != TK_LP) {
code = tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
goto _clean;
}
while (1) {
index = 0;
sToken = tStrGetToken(str, &index, false);
str += index;
if (TK_STRING == sToken.type) {
tscDequoteAndTrimToken(&sToken);
}
if (sToken.type == TK_RP) {
if (end != NULL) { // set the end position
*end = str;
}
break;
}
bool findColumnIndex = false;
// todo speedup by using hash list
for (int32_t t = 0; t < pColInfo->numOfCols; ++t) {
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
if (pColInfo->cols[t].hasVal == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _clean;
}
pColInfo->cols[t].hasVal = true;
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
pColInfo->numOfBound += 1;
findColumnIndex = true;
break;
}
}
if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column/tag name", sToken.z);
goto _clean;
}
}
memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0 , sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
return TSDB_CODE_SUCCESS;
_clean:
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
/** /**
* parse insert sql * parse insert sql
* @param pSql * @param pSql
...@@ -1083,7 +1089,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1083,7 +1089,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
while (1) { while (1) {
int32_t index = 0; int32_t index = 0;
SStrToken sToken = tStrGetToken(str, &index, false, 0, NULL); SStrToken sToken = tStrGetToken(str, &index, false);
// no data in the sql string anymore. // no data in the sql string anymore.
if (sToken.n == 0) { if (sToken.n == 0) {
...@@ -1121,7 +1127,8 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1121,7 +1127,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean; goto _clean;
} }
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { char* bindedColumns = NULL;
if ((code = tscCheckIfCreateTable(&str, pSql, &bindedColumns)) != TSDB_CODE_SUCCESS) {
/* /*
* After retrieving the table meta from server, the sql string will be parsed from the paused position. * After retrieving the table meta from server, the sql string will be parsed from the paused position.
* And during the getTableMetaCallback function, the sql string will be parsed from the paused position. * And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
...@@ -1141,7 +1148,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1141,7 +1148,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
} }
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL); sToken = tStrGetToken(str, &index, false);
str += index; str += index;
if (sToken.n == 0) { if (sToken.n == 0) {
...@@ -1151,21 +1158,21 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1151,21 +1158,21 @@ int tsParseInsertSql(SSqlObj *pSql) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (sToken.type == TK_VALUES) { if (bindedColumns == NULL) {
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns}; STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
/* STableDataBlocks *dataBuf = NULL;
* app here insert data in different vnodes, so we need to set the following int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
* data in another submit procedure using async insert routines sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
*/ if (ret != TSDB_CODE_SUCCESS) {
code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); goto _clean;
}
code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1175,7 +1182,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1175,7 +1182,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
} }
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL); sToken = tStrGetToken(str, &index, false);
if (sToken.type != TK_STRING && sToken.type != TK_ID) { if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _clean; goto _clean;
...@@ -1199,77 +1206,38 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1199,77 +1206,38 @@ int tsParseInsertSql(SSqlObj *pSql) {
tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
wordfree(&full_path); wordfree(&full_path);
} else if (sToken.type == TK_LP) { } else if (bindedColumns != NULL) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ // insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
SParsedDataColInfo spd = {0}; STableDataBlocks *dataBuf = NULL;
spd.numOfCols = tinfo.numOfColumns; int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
int16_t offset[TSDB_MAX_COLUMNS] = {0}; if (ret != TSDB_CODE_SUCCESS) {
for (int32_t t = 1; t < tinfo.numOfColumns; ++t) { goto _clean;
offset[t] = offset[t - 1] + pSchema[t - 1].bytes;
} }
while (1) { SSchema* pSchema = tscGetTableSchema(pTableMeta);
index = 0; code = parseBoundColumns(pCmd, &dataBuf->boundColumnInfo, pSchema, bindedColumns, NULL);
sToken = tStrGetToken(str, &index, false, 0, NULL); if (code != TSDB_CODE_SUCCESS) {
str += index; goto _clean;
if (TK_STRING == sToken.type) {
tscDequoteAndTrimToken(&sToken);
}
if (sToken.type == TK_RP) {
break;
}
bool findColumnIndex = false;
// todo speedup by using hash list
for (int32_t t = 0; t < tinfo.numOfColumns; ++t) {
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++];
pElem->offset = offset[t];
pElem->colIndex = t;
if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _clean;
}
spd.hasVal[t] = true;
findColumnIndex = true;
break;
}
}
if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _clean;
}
} }
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { if (dataBuf->boundColumnInfo.cols[0].hasVal == false) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", NULL);
goto _clean; goto _clean;
} }
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
str += index;
if (sToken.type != TK_VALUES) { if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _clean; goto _clean;
} }
code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); code = doParseInsertStatement(pCmd, &str, dataBuf, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1294,7 +1262,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1294,7 +1262,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean; goto _clean;
_clean: _clean:
pCmd->curSql = NULL; pCmd->curSql = NULL;
pCmd->parseFinished = 1; pCmd->parseFinished = 1;
return code; return code;
} }
...@@ -1307,7 +1275,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) { ...@@ -1307,7 +1275,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
int32_t index = 0; int32_t index = 0;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SStrToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); SStrToken sToken = tStrGetToken(pSql->sqlstr, &index, false);
assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT); assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
pCmd->count = 0; pCmd->count = 0;
...@@ -1317,7 +1285,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) { ...@@ -1317,7 +1285,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); sToken = tStrGetToken(pSql->sqlstr, &index, false);
if (sToken.type != TK_INTO) { if (sToken.type != TK_INTO) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z); return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
} }
...@@ -1450,12 +1418,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1450,12 +1418,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSchema * pSchema = tscGetTableSchema(pTableMeta);
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
tfree(pCmd->pTableNameList); tfree(pCmd->pTableNameList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
...@@ -1495,8 +1459,9 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1495,8 +1459,9 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char *lineptr = line; char *lineptr = line;
strtolower(line, line); strtolower(line, line);
int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf); int32_t len = 0;
if (len <= 0 || pTableDataBlock->numOfParams > 0) { code = tsParseOneRow(&lineptr, pTableDataBlock, pCmd, tinfo.precision, &code, tokenBuf);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code; pSql->res.code = code;
break; break;
} }
......
...@@ -548,6 +548,11 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -548,6 +548,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
} }
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->boundedColumns);
tfree(pColInfo->cols);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
...@@ -568,6 +573,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -568,6 +573,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
tfree(pDataBlock); tfree(pDataBlock);
} }
...@@ -678,7 +684,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -678,7 +684,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* @param dataBlocks * @param dataBlocks
* @return * @return
*/ */
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) { if (dataBuf == NULL) {
...@@ -686,10 +692,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -686,10 +692,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
dataBuf->nAllocSize = (uint32_t)initialSize; dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header dataBuf->headerSize = startOffset;
// the header size will always be the startOffset value, reserved for the subumit block header
if (dataBuf->nAllocSize <= dataBuf->headerSize) { if (dataBuf->nAllocSize <= dataBuf->headerSize) {
dataBuf->nAllocSize = dataBuf->headerSize*2; dataBuf->nAllocSize = dataBuf->headerSize * 2;
} }
dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = calloc(1, dataBuf->nAllocSize);
...@@ -699,25 +707,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -699,25 +707,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
dataBuf->ordered = true; //Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->prevTS = INT64_MIN; dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->rowSize = rowSize; dataBuf->ordered = true;
dataBuf->size = startOffset; dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name); tNameAssign(&dataBuf->tableName, name);
//Here we keep the tableMeta to avoid it to be remove by other threads. assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf; *dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) { SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
SArray* pBlockList) {
*dataBlocks = NULL; *dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) { if (t1 != NULL) {
...@@ -942,7 +956,7 @@ bool tscIsInsertData(char* sqlstr) { ...@@ -942,7 +956,7 @@ bool tscIsInsertData(char* sqlstr) {
int32_t index = 0; int32_t index = 0;
do { do {
SStrToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL); SStrToken t0 = tStrGetToken(sqlstr, &index, false);
if (t0.type != TK_LP) { if (t0.type != TK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT; return t0.type == TK_INSERT || t0.type == TK_IMPORT;
} }
......
...@@ -560,7 +560,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) { ...@@ -560,7 +560,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
return 0; return 0;
} }
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t* ignoreTokenTypes) { SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
SStrToken t0 = {0}; SStrToken t0 = {0};
// here we reach the end of sql string, null-terminated string // here we reach the end of sql string, null-terminated string
...@@ -585,7 +585,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn ...@@ -585,7 +585,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
} }
t0.n = tSQLGetToken(&str[*i], &t0.type); t0.n = tSQLGetToken(&str[*i], &t0.type);
break;
// not support user specfied ignored symbol list
#if 0
bool ignore = false; bool ignore = false;
for (uint32_t k = 0; k < numOfIgnoreToken; k++) { for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
if (t0.type == ignoreTokenTypes[k]) { if (t0.type == ignoreTokenTypes[k]) {
...@@ -597,6 +600,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn ...@@ -597,6 +600,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
if (!ignore) { if (!ignore) {
break; break;
} }
#endif
} }
if (t0.type == TK_SEMI) { if (t0.type == TK_SEMI) {
......
...@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType); ...@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType);
* @param str * @param str
* @param i * @param i
* @param isPrevOptr * @param isPrevOptr
* @param numOfIgnoreToken
* @param ignoreTokenTypes
* @return * @return
*/ */
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t *ignoreTokenTypes); SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/** /**
* check if it is a keyword or not * check if it is a keyword or not
......
...@@ -204,7 +204,13 @@ if $data03 != NULL then ...@@ -204,7 +204,13 @@ if $data03 != NULL then
return -1 return -1
endi endi
sql reset query cache print ============================>TD-3366 TD-3486
sql insert into td_3366(ts, c3, c1) using mt(t1) tags(911) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into td_3486(ts, c3, c1) using mt(t1) tags(-12) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into ttxu(ts, c3, c1) using mt(t1) tags('-121') values('2018-1-1 11:11:11', 'new1', 12);
sql insert into tb(ts, c1, c3) using mt(t1) tags(123) values('2018-11-01 16:29:58.000', 2, 'port')
sql insert into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3) sql insert into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3) sql import into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:39:58.000', 2, 'import', 3) sql import into tb values ('2018-11-01 16:39:58.000', 2, 'import', 3)
...@@ -212,6 +218,7 @@ sql select * from tb order by ts desc ...@@ -212,6 +218,7 @@ sql select * from tb order by ts desc
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
if $data03 != 3 then if $data03 != 3 then
return -1 return -1
endi endi
...@@ -233,10 +240,10 @@ sql_error alter table mt add column c1 int ...@@ -233,10 +240,10 @@ sql_error alter table mt add column c1 int
# drop non-existing columns # drop non-existing columns
sql_error alter table mt drop column c9 sql_error alter table mt drop column c9
sql drop database $db #sql drop database $db
sql show databases #sql show databases
if $rows != 0 then #if $rows != 0 then
return -1 # return -1
endi #endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册