Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
15da33e0
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
15da33e0
编写于
4月 03, 2021
作者:
H
haojun Liao
提交者:
GitHub
4月 03, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5660 from taosdata/feature/qrefactor
Feature/qrefactor
上级
edeaa569
49b78ead
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
347 addition
and
341 deletion
+347
-341
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+2
-13
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+17
-0
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+249
-283
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+44
-21
src/query/src/qTokenizer.c
src/query/src/qTokenizer.c
+5
-1
src/query/tests/resultBufferTest.cpp
src/query/tests/resultBufferTest.cpp
+3
-3
src/util/inc/tstoken.h
src/util/inc/tstoken.h
+1
-3
tests/script/general/parser/alter.sim
tests/script/general/parser/alter.sim
+13
-6
tests/script/general/parser/gendata.sh
tests/script/general/parser/gendata.sh
+6
-0
tests/script/general/parser/import_file.sim
tests/script/general/parser/import_file.sim
+7
-11
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
15da33e0
...
...
@@ -36,19 +36,6 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
typedef
struct
SParsedColElem
{
int16_t
colIndex
;
uint16_t
offset
;
}
SParsedColElem
;
typedef
struct
SParsedDataColInfo
{
int16_t
numOfCols
;
int16_t
numOfAssignedCols
;
SParsedColElem
elems
[
TSDB_MAX_COLUMNS
];
bool
hasVal
[
TSDB_MAX_COLUMNS
];
}
SParsedDataColInfo
;
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
...
...
@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
);
void
tscSortRemoveDataBlockDupRows
(
STableDataBlocks
*
dataBuf
);
void
tscDestroyBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
);
SParamInfo
*
tscAddParamToDataBlock
(
STableDataBlocks
*
pDataBlock
,
char
type
,
uint8_t
timePrec
,
int16_t
bytes
,
uint32_t
offset
);
...
...
src/client/inc/tsclient.h
浏览文件 @
15da33e0
...
...
@@ -175,6 +175,19 @@ typedef struct SParamInfo {
uint32_t
offset
;
}
SParamInfo
;
typedef
struct
SBoundColumn
{
bool
hasVal
;
// denote if current column has bound or not
int32_t
offset
;
// all column offset value
}
SBoundColumn
;
typedef
struct
SParsedDataColInfo
{
int16_t
numOfCols
;
int16_t
numOfBound
;
int32_t
*
boundedColumns
;
SBoundColumn
*
cols
;
}
SParsedDataColInfo
;
typedef
struct
STableDataBlocks
{
SName
tableName
;
int8_t
tsSource
;
// where does the UNIX timestamp come from, server or client
...
...
@@ -189,6 +202,8 @@ typedef struct STableDataBlocks {
STableMeta
*
pTableMeta
;
// the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char
*
pData
;
SParsedDataColInfo
boundColumnInfo
;
// for parameter ('?') binding
uint32_t
numOfAllocedParams
;
uint32_t
numOfParams
;
...
...
@@ -425,6 +440,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t
tscCreateResPointerInfo
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
tscSetResRawPtr
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
destroyTableNameList
(
SSqlCmd
*
pCmd
);
void
tscResetSqlCmd
(
SSqlCmd
*
pCmd
,
bool
removeMeta
);
...
...
@@ -462,6 +478,7 @@ char* tscGetSqlStr(SSqlObj* pSql);
bool
tscIsQueryWithLimit
(
SSqlObj
*
pSql
);
bool
tscHasReachLimitation
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
void
tscSetBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
,
SSchema
*
pSchema
,
int32_t
numOfCols
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
...
...
src/client/src/tscParseInsert.c
浏览文件 @
15da33e0
...
...
@@ -40,6 +40,7 @@ enum {
};
static
int32_t
tscAllocateMemIfNeed
(
STableDataBlocks
*
pDataBlock
,
int32_t
rowSize
,
int32_t
*
numOfRows
);
static
int32_t
parseBoundColumns
(
SSqlCmd
*
pCmd
,
SParsedDataColInfo
*
pColInfo
,
SSchema
*
pSchema
,
char
*
str
,
char
**
end
);
static
int32_t
tscToDouble
(
SStrToken
*
pToken
,
double
*
value
,
char
**
endPtr
)
{
errno
=
0
;
...
...
@@ -94,12 +95,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
*/
SStrToken
valueToken
;
index
=
0
;
sToken
=
tStrGetToken
(
pTokenEnd
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
pTokenEnd
,
&
index
,
false
);
pTokenEnd
+=
index
;
if
(
sToken
.
type
==
TK_MINUS
||
sToken
.
type
==
TK_PLUS
)
{
index
=
0
;
valueToken
=
tStrGetToken
(
pTokenEnd
,
&
index
,
false
,
0
,
NULL
);
valueToken
=
tStrGetToken
(
pTokenEnd
,
&
index
,
false
);
pTokenEnd
+=
index
;
if
(
valueToken
.
n
<
2
)
{
...
...
@@ -127,13 +128,12 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
return
TSDB_CODE_SUCCESS
;
}
// todo extract the null value check
static
bool
isNullStr
(
SStrToken
*
pToken
)
{
return
(
pToken
->
type
==
TK_NULL
)
||
((
pToken
->
type
==
TK_STRING
)
&&
(
pToken
->
n
!=
0
)
&&
(
strncasecmp
(
TSDB_DATA_NULL_STR_L
,
pToken
->
z
,
pToken
->
n
)
==
0
));
}
int32_t
tsParseOneColumn
Data
(
SSchema
*
pSchema
,
SStrToken
*
pToken
,
char
*
payload
,
char
*
msg
,
char
**
str
,
bool
primaryKey
,
int16_t
timePrec
)
{
int32_t
tsParseOneColumn
(
SSchema
*
pSchema
,
SStrToken
*
pToken
,
char
*
payload
,
char
*
msg
,
char
**
str
,
bool
primaryKey
,
int16_t
timePrec
)
{
int64_t
iv
;
int32_t
ret
;
char
*
endptr
=
NULL
;
...
...
@@ -417,29 +417,32 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
return
TSDB_CODE_SUCCESS
;
}
int
tsParseOneRowData
(
char
**
str
,
STableDataBlocks
*
pDataBlocks
,
SSchema
schema
[],
SParsedDataColInfo
*
spd
,
SSqlCmd
*
pCmd
,
int16_t
timePrec
,
int32_t
*
code
,
char
*
tmpTokenBuf
)
{
int32_t
index
=
0
;
SStrToken
sToken
=
{
0
};
char
*
payload
=
pDataBlocks
->
pData
+
pDataBlocks
->
size
;
int
tsParseOneRow
(
char
**
str
,
STableDataBlocks
*
pDataBlocks
,
SSqlCmd
*
pCmd
,
int16_t
timePrec
,
int32_t
*
len
,
char
*
tmpTokenBuf
)
{
int32_t
index
=
0
;
SStrToken
sToken
=
{
0
};
char
*
payload
=
pDataBlocks
->
pData
+
pDataBlocks
->
size
;
SParsedDataColInfo
*
spd
=
&
pDataBlocks
->
boundColumnInfo
;
SSchema
*
schema
=
tscGetTableSchema
(
pDataBlocks
->
pTableMeta
);
// 1. set the parsed value from sql string
int32_t
rowSize
=
0
;
for
(
int
i
=
0
;
i
<
spd
->
numOf
AssignedCols
;
++
i
)
{
for
(
int
i
=
0
;
i
<
spd
->
numOf
Bound
;
++
i
)
{
// the start position in data block buffer of current value in sql
char
*
start
=
payload
+
spd
->
elems
[
i
].
offset
;
int16_t
colIndex
=
spd
->
elems
[
i
].
colIndex
;
SSchema
*
pSchema
=
schema
+
colIndex
;
int32_t
colIndex
=
spd
->
boundedColumns
[
i
];
char
*
start
=
payload
+
spd
->
cols
[
colIndex
].
offset
;
SSchema
*
pSchema
=
&
schema
[
colIndex
];
rowSize
+=
pSchema
->
bytes
;
index
=
0
;
sToken
=
tStrGetToken
(
*
str
,
&
index
,
true
,
0
,
NULL
);
sToken
=
tStrGetToken
(
*
str
,
&
index
,
true
);
*
str
+=
index
;
if
(
sToken
.
type
==
TK_QUESTION
)
{
if
(
pCmd
->
insertType
!=
TSDB_QUERY_TYPE_STMT_INSERT
)
{
*
code
=
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"? only allowed in binding insertion"
,
*
str
);
return
-
1
;
return
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"? only allowed in binding insertion"
,
*
str
);
}
uint32_t
offset
=
(
uint32_t
)(
start
-
pDataBlocks
->
pData
);
...
...
@@ -448,15 +451,13 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
}
strcpy
(
pCmd
->
payload
,
"client out of memory"
);
*
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
-
1
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int16_t
type
=
sToken
.
type
;
if
((
type
!=
TK_NOW
&&
type
!=
TK_INTEGER
&&
type
!=
TK_STRING
&&
type
!=
TK_FLOAT
&&
type
!=
TK_BOOL
&&
type
!=
TK_NULL
&&
type
!=
TK_HEX
&&
type
!=
TK_OCT
&&
type
!=
TK_BIN
)
||
(
sToken
.
n
==
0
)
||
(
type
==
TK_RP
))
{
*
code
=
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"invalid data or symbol"
,
sToken
.
z
);
return
-
1
;
return
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"invalid data or symbol"
,
sToken
.
z
);
}
// Remove quotation marks
...
...
@@ -485,26 +486,23 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
}
bool
isPrimaryKey
=
(
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
int32_t
ret
=
tsParseOneColumn
Data
(
pSchema
,
&
sToken
,
start
,
pCmd
->
payload
,
str
,
isPrimaryKey
,
timePrec
);
int32_t
ret
=
tsParseOneColumn
(
pSchema
,
&
sToken
,
start
,
pCmd
->
payload
,
str
,
isPrimaryKey
,
timePrec
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
*
code
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
return
-
1
;
// NOTE: here 0 mean error!
return
ret
;
}
if
(
isPrimaryKey
&&
tsCheckTimestamp
(
pDataBlocks
,
start
)
!=
TSDB_CODE_SUCCESS
)
{
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"client time/server time can not be mixed up"
,
sToken
.
z
);
*
code
=
TSDB_CODE_TSC_INVALID_TIME_STAMP
;
return
-
1
;
return
TSDB_CODE_TSC_INVALID_TIME_STAMP
;
}
}
// 2. set the null value for the columns that do not assign values
if
(
spd
->
numOf
AssignedCols
<
spd
->
numOfCols
)
{
if
(
spd
->
numOf
Bound
<
spd
->
numOfCols
)
{
char
*
ptr
=
payload
;
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfCols
;
++
i
)
{
if
(
!
spd
->
hasVal
[
i
])
{
// current column do not have any value to insert, set it to null
if
(
!
spd
->
cols
[
i
].
hasVal
)
{
// current column do not have any value to insert, set it to null
if
(
schema
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
)
{
varDataSetLen
(
ptr
,
sizeof
(
int8_t
));
*
(
uint8_t
*
)
varDataVal
(
ptr
)
=
TSDB_DATA_BINARY_NULL
;
...
...
@@ -522,7 +520,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
rowSize
=
(
int32_t
)(
ptr
-
payload
);
}
return
rowSize
;
*
len
=
rowSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
rowDataCompar
(
const
void
*
lhs
,
const
void
*
rhs
)
{
...
...
@@ -536,80 +535,79 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
}
}
int
tsParseValues
(
char
**
str
,
STableDataBlocks
*
pDataBlock
,
STableMeta
*
pTableMeta
,
int
maxRows
,
SParsedDataColInfo
*
spd
,
SSqlCmd
*
pCmd
,
int32_t
*
code
,
char
*
tmpTokenBuf
)
{
int32_t
index
=
0
;
SStrToken
sToken
;
int32_t
tsParseValues
(
char
**
str
,
STableDataBlocks
*
pDataBlock
,
int
maxRows
,
SSqlCmd
*
pCmd
,
int32_t
*
numOfRows
,
char
*
tmpTokenBuf
)
{
int32_t
index
=
0
;
int32_t
code
=
0
;
int32_t
numOfRows
=
0
;
(
*
numOfRows
)
=
0
;
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
SStrToken
sToken
;
STableMeta
*
pTableMeta
=
pDataBlock
->
pTableMeta
;
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
int32_t
precision
=
tinfo
.
precision
;
if
(
spd
->
hasVal
[
0
]
==
false
)
{
*
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"primary timestamp column can not be null"
,
*
str
);
return
-
1
;
}
while
(
1
)
{
index
=
0
;
sToken
=
tStrGetToken
(
*
str
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
*
str
,
&
index
,
false
);
if
(
sToken
.
n
==
0
||
sToken
.
type
!=
TK_LP
)
break
;
*
str
+=
index
;
if
(
numOfRows
>=
maxRows
||
pDataBlock
->
size
+
tinfo
.
rowSize
>=
pDataBlock
->
nAllocSize
)
{
if
(
(
*
numOfRows
)
>=
maxRows
||
pDataBlock
->
size
+
tinfo
.
rowSize
>=
pDataBlock
->
nAllocSize
)
{
int32_t
tSize
;
*
code
=
tscAllocateMemIfNeed
(
pDataBlock
,
tinfo
.
rowSize
,
&
tSize
);
if
(
*
code
!=
TSDB_CODE_SUCCESS
)
{
//TODO pass the correct error code to client
code
=
tscAllocateMemIfNeed
(
pDataBlock
,
tinfo
.
rowSize
,
&
tSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
//TODO pass the correct error code to client
strcpy
(
pCmd
->
payload
,
"client out of memory"
);
return
-
1
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
ASSERT
(
tSize
>
maxRows
);
maxRows
=
tSize
;
}
int32_t
len
=
tsParseOneRowData
(
str
,
pDataBlock
,
pSchema
,
spd
,
pCmd
,
precision
,
code
,
tmpTokenBuf
);
if
(
len
<=
0
)
{
// error message has been set in tsParseOneRowData
return
-
1
;
int32_t
len
=
0
;
code
=
tsParseOneRow
(
str
,
pDataBlock
,
pCmd
,
precision
,
&
len
,
tmpTokenBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// error message has been set in tsParseOneRow, return directly
return
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
}
pDataBlock
->
size
+=
len
;
index
=
0
;
sToken
=
tStrGetToken
(
*
str
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
*
str
,
&
index
,
false
);
*
str
+=
index
;
if
(
sToken
.
n
==
0
||
sToken
.
type
!=
TK_RP
)
{
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
") expected"
,
*
str
);
*
code
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
code
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
return
-
1
;
}
numOfRows
++
;
(
*
numOfRows
)
++
;
}
if
(
numOfRows
<=
0
)
{
if
(
(
*
numOfRows
)
<=
0
)
{
strcpy
(
pCmd
->
payload
,
"no any data points"
);
*
code
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
return
-
1
;
return
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
}
else
{
return
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
tscSetAssignedColumnInfo
(
SParsedDataColInfo
*
spd
,
SSchema
*
pSchema
,
int32_t
numOfCols
)
{
spd
->
numOfCols
=
numOfCols
;
spd
->
numOfAssignedCols
=
numOfCols
;
void
tscSetBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
,
SSchema
*
pSchema
,
int32_t
numOfCols
)
{
pColInfo
->
numOfCols
=
numOfCols
;
pColInfo
->
numOfBound
=
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
spd
->
hasVal
[
i
]
=
true
;
spd
->
elems
[
i
].
colIndex
=
i
;
pColInfo
->
boundedColumns
=
calloc
(
pColInfo
->
numOfCols
,
sizeof
(
int32_t
));
pColInfo
->
cols
=
calloc
(
pColInfo
->
numOfCols
,
sizeof
(
SBoundColumn
));
for
(
int32_t
i
=
0
;
i
<
pColInfo
->
numOfCols
;
++
i
)
{
if
(
i
>
0
)
{
spd
->
elems
[
i
].
offset
=
spd
->
elems
[
i
-
1
].
offset
+
pSchema
[
i
-
1
].
bytes
;
pColInfo
->
cols
[
i
].
offset
=
pSchema
[
i
-
1
].
bytes
+
pColInfo
->
cols
[
i
-
1
].
offset
;
}
pColInfo
->
cols
[
i
].
hasVal
=
true
;
pColInfo
->
boundedColumns
[
i
]
=
i
;
}
}
...
...
@@ -697,33 +695,26 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
}
}
static
int32_t
doParseInsertStatement
(
SSqlCmd
*
pCmd
,
char
**
str
,
SParsedDataColInfo
*
spd
,
int32_t
*
totalNum
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
dataBuf
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
static
int32_t
doParseInsertStatement
(
SSqlCmd
*
pCmd
,
char
**
str
,
STableDataBlocks
*
dataBuf
,
int32_t
*
totalNum
)
{
STableComInfo
tinfo
=
tscGetTableInfo
(
dataBuf
->
pTableMeta
);
int32_t
maxNumOfRows
;
ret
=
tscAllocateMemIfNeed
(
dataBuf
,
tinfo
.
rowSize
,
&
maxNumOfRows
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
int32_t
code
=
tscAllocateMemIfNeed
(
dataBuf
,
tinfo
.
rowSize
,
&
maxNumOfRows
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_TSC_INVALID_SQL
;
char
*
tmpTokenBuf
=
calloc
(
1
,
16
*
1024
);
// used for deleting Escape character: \\, \', \"
code
=
TSDB_CODE_TSC_INVALID_SQL
;
char
*
tmpTokenBuf
=
calloc
(
1
,
16
*
1024
);
// used for deleting Escape character: \\, \', \"
if
(
NULL
==
tmpTokenBuf
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
numOfRows
=
tsParseValues
(
str
,
dataBuf
,
pTableMeta
,
maxNumOfRows
,
spd
,
pCmd
,
&
code
,
tmpTokenBuf
);
int32_t
numOfRows
=
0
;
code
=
tsParseValues
(
str
,
dataBuf
,
maxNumOfRows
,
pCmd
,
&
numOfRows
,
tmpTokenBuf
);
free
(
tmpTokenBuf
);
if
(
numOfRows
<=
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -736,25 +727,23 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
dataBuf
->
pData
);
code
=
tsSetBlockInfo
(
pBlocks
,
pTableMeta
,
numOfRows
);
code
=
tsSetBlockInfo
(
pBlocks
,
dataBuf
->
pTableMeta
,
numOfRows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"too many rows in sql, total number of rows should be less than 32767"
,
*
str
);
return
code
;
}
dataBuf
->
vgId
=
pTableMeta
->
vgId
;
dataBuf
->
numOfTables
=
1
;
*
totalNum
+=
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tscCheckIfCreateTable
(
char
**
sqlstr
,
SSqlObj
*
pSql
)
{
static
int32_t
tscCheckIfCreateTable
(
char
**
sqlstr
,
SSqlObj
*
pSql
,
char
**
boundColumn
)
{
int32_t
index
=
0
;
SStrToken
sToken
=
{
0
};
SStrToken
tableToken
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
const
int32_t
TABLE_INDEX
=
0
;
const
int32_t
STABLE_INDEX
=
1
;
...
...
@@ -767,38 +756,37 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
// get the token of specified table
index
=
0
;
tableToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
tableToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
char
*
cstart
=
NULL
;
char
*
cend
=
NULL
;
// skip possibly exists column list
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
int32_t
numOfColList
=
0
;
bool
createTable
=
false
;
// Bind table columns list in string, skip it and continue
if
(
sToken
.
type
==
TK_LP
)
{
cstart
=
&
sToken
.
z
[
0
];
index
=
0
;
*
boundColumn
=
&
sToken
.
z
[
0
];
while
(
1
)
{
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
if
(
sToken
.
type
==
TK_RP
)
{
cend
=
&
sToken
.
z
[
0
];
break
;
}
sql
+=
index
;
++
numOfColList
;
}
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
}
if
(
numOfColList
==
0
&&
cstart
!=
NULL
)
{
if
(
numOfColList
==
0
&&
(
*
boundColumn
)
!=
NULL
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
...
...
@@ -806,7 +794,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
if
(
sToken
.
type
==
TK_USING
)
{
// create table if not exists according to the super table
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
//the source super table is moved to the secondary position of the pTableMetaInfo list
...
...
@@ -835,82 +823,42 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
SSchema
*
pTagSchema
=
tscGetTableTagSchema
(
pSTableMetaInfo
->
pTableMeta
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pSTableMetaInfo
->
pTableMeta
);
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sql
+=
index
;
SParsedDataColInfo
spd
=
{
0
};
uint8_t
numOfTags
=
tscGetNumOfTags
(
pSTableMetaInfo
->
pTableMeta
);
spd
.
numOfCols
=
numOfTags
;
// if specify some tags column
if
(
sToken
.
type
!=
TK_LP
)
{
tscSetAssignedColumnInfo
(
&
spd
,
pTagSchema
,
numOfTags
);
}
else
{
/* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
* tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
int16_t
offset
[
TSDB_MAX_COLUMNS
]
=
{
0
};
for
(
int32_t
t
=
1
;
t
<
numOfTags
;
++
t
)
{
offset
[
t
]
=
offset
[
t
-
1
]
+
pTagSchema
[
t
-
1
].
bytes
;
}
while
(
1
)
{
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sql
+=
index
;
if
(
TK_STRING
==
sToken
.
type
)
{
strdequote
(
sToken
.
z
);
sToken
.
n
=
(
uint32_t
)
strtrim
(
sToken
.
z
);
}
if
(
sToken
.
type
==
TK_RP
)
{
break
;
}
bool
findColumnIndex
=
false
;
tscSetBoundColumnInfo
(
&
spd
,
pTagSchema
,
tscGetNumOfTags
(
pSTableMetaInfo
->
pTableMeta
));
// todo speedup by using hash list
for
(
int32_t
t
=
0
;
t
<
numOfTags
;
++
t
)
{
if
(
strncmp
(
sToken
.
z
,
pTagSchema
[
t
].
name
,
sToken
.
n
)
==
0
&&
strlen
(
pTagSchema
[
t
].
name
)
==
sToken
.
n
)
{
SParsedColElem
*
pElem
=
&
spd
.
elems
[
spd
.
numOfAssignedCols
++
];
pElem
->
offset
=
offset
[
t
];
pElem
->
colIndex
=
t
;
if
(
spd
.
hasVal
[
t
]
==
true
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"duplicated tag name"
,
sToken
.
z
);
}
spd
.
hasVal
[
t
]
=
true
;
findColumnIndex
=
true
;
break
;
}
}
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
if
(
sToken
.
type
!=
TK_TAGS
&&
sToken
.
type
!=
TK_LP
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword TAGS expected"
,
sToken
.
z
);
}
if
(
!
findColumnIndex
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"invalid tag name"
,
sToken
.
z
);
}
// parse the bound tags column
if
(
sToken
.
type
==
TK_LP
)
{
/*
* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
* tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn);
*/
char
*
end
=
NULL
;
code
=
parseBoundColumns
(
pCmd
,
&
spd
,
pTagSchema
,
sql
,
&
end
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
spd
.
numOfAssignedCols
==
0
||
spd
.
numOfAssignedCols
>
numOfTags
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"tag name expected"
,
sToken
.
z
);
}
sql
=
end
;
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
index
=
0
;
// keywords of "TAGS"
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
}
else
{
sql
+=
index
;
}
if
(
sToken
.
type
!=
TK_TAGS
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword TAGS expected"
,
sToken
.
z
);
}
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
if
(
sToken
.
type
!=
TK_LP
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
NULL
,
sToken
.
z
);
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"( is expected"
,
sToken
.
z
);
}
SKVRowBuilder
kvRowBuilder
=
{
0
};
...
...
@@ -918,13 +866,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
uint32_t
ignoreTokenTypes
=
TK_LP
;
uint32_t
numOfIgnoreToken
=
1
;
for
(
int
i
=
0
;
i
<
spd
.
numOfAssignedCols
;
++
i
)
{
SSchema
*
pSchema
=
pTagSchema
+
spd
.
elems
[
i
].
colIndex
;
for
(
int
i
=
0
;
i
<
spd
.
numOfBound
;
++
i
)
{
SSchema
*
pSchema
=
&
pTagSchema
[
spd
.
boundedColumns
[
i
]];
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
true
,
numOfIgnoreToken
,
&
ignoreTokenTypes
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
true
);
sql
+=
index
;
if
(
TK_ILLEGAL
==
sToken
.
type
)
{
...
...
@@ -943,7 +889,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
char
tagVal
[
TSDB_MAX_TAGS_LEN
];
code
=
tsParseOneColumn
Data
(
pSchema
,
&
sToken
,
tagVal
,
pCmd
->
payload
,
&
sql
,
false
,
tinfo
.
precision
);
code
=
tsParseOneColumn
(
pSchema
,
&
sToken
,
tagVal
,
pCmd
->
payload
,
&
sql
,
false
,
tinfo
.
precision
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
return
code
;
...
...
@@ -952,6 +898,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
tdAddColToKVRow
(
&
kvRowBuilder
,
pSchema
->
colId
,
pSchema
->
type
,
tagVal
);
}
tscDestroyBoundColumnInfo
(
&
spd
);
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
kvRowBuilder
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
if
(
row
==
NULL
)
{
...
...
@@ -974,7 +922,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
pCmd
->
tagData
.
data
=
pTag
;
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
sql
+=
index
;
if
(
sToken
.
n
==
0
||
sToken
.
type
!=
TK_RP
)
{
return
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
") expected"
,
sToken
.
z
);
...
...
@@ -989,33 +937,21 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
ret
;
}
createTable
=
true
;
code
=
tscGetTableMetaEx
(
pSql
,
pTableMetaInfo
,
true
);
if
(
TSDB_CODE_TSC_ACTION_IN_PROGRESS
==
code
)
{
return
code
;
}
}
else
{
if
(
cstart
!=
NULL
)
{
sql
=
cstart
;
}
else
{
sql
=
sToken
.
z
;
}
sql
=
sToken
.
z
;
code
=
tscGetTableMetaEx
(
pSql
,
pTableMetaInfo
,
false
);
if
(
pCmd
->
curSql
==
NULL
)
{
assert
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
);
}
}
int32_t
len
=
(
int32_t
)(
cend
-
cstart
+
1
);
if
(
cstart
!=
NULL
&&
createTable
==
true
)
{
/* move the column list to start position of the next accessed points */
memmove
(
sql
-
len
,
cstart
,
len
);
*
sqlstr
=
sql
-
len
;
}
else
{
*
sqlstr
=
sql
;
}
*
sqlstr
=
sql
;
if
(
*
sqlstr
==
NULL
)
{
code
=
TSDB_CODE_TSC_INVALID_SQL
;
...
...
@@ -1043,6 +979,76 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseBoundColumns
(
SSqlCmd
*
pCmd
,
SParsedDataColInfo
*
pColInfo
,
SSchema
*
pSchema
,
char
*
str
,
char
**
end
)
{
pColInfo
->
numOfBound
=
0
;
memset
(
pColInfo
->
boundedColumns
,
0
,
sizeof
(
int32_t
)
*
pColInfo
->
numOfCols
);
for
(
int32_t
i
=
0
;
i
<
pColInfo
->
numOfCols
;
++
i
)
{
pColInfo
->
cols
[
i
].
hasVal
=
false
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
index
=
0
;
SStrToken
sToken
=
tStrGetToken
(
str
,
&
index
,
false
);
str
+=
index
;
if
(
sToken
.
type
!=
TK_LP
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"( is expected"
,
sToken
.
z
);
goto
_clean
;
}
while
(
1
)
{
index
=
0
;
sToken
=
tStrGetToken
(
str
,
&
index
,
false
);
str
+=
index
;
if
(
TK_STRING
==
sToken
.
type
)
{
tscDequoteAndTrimToken
(
&
sToken
);
}
if
(
sToken
.
type
==
TK_RP
)
{
if
(
end
!=
NULL
)
{
// set the end position
*
end
=
str
;
}
break
;
}
bool
findColumnIndex
=
false
;
// todo speedup by using hash list
for
(
int32_t
t
=
0
;
t
<
pColInfo
->
numOfCols
;
++
t
)
{
if
(
strncmp
(
sToken
.
z
,
pSchema
[
t
].
name
,
sToken
.
n
)
==
0
&&
strlen
(
pSchema
[
t
].
name
)
==
sToken
.
n
)
{
if
(
pColInfo
->
cols
[
t
].
hasVal
==
true
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"duplicated column name"
,
sToken
.
z
);
goto
_clean
;
}
pColInfo
->
cols
[
t
].
hasVal
=
true
;
pColInfo
->
boundedColumns
[
pColInfo
->
numOfBound
]
=
t
;
pColInfo
->
numOfBound
+=
1
;
findColumnIndex
=
true
;
break
;
}
}
if
(
!
findColumnIndex
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"invalid column/tag name"
,
sToken
.
z
);
goto
_clean
;
}
}
memset
(
&
pColInfo
->
boundedColumns
[
pColInfo
->
numOfBound
],
0
,
sizeof
(
int32_t
)
*
(
pColInfo
->
numOfCols
-
pColInfo
->
numOfBound
));
return
TSDB_CODE_SUCCESS
;
_clean:
pCmd
->
curSql
=
NULL
;
pCmd
->
parseFinished
=
1
;
return
code
;
}
/**
* parse insert sql
* @param pSql
...
...
@@ -1083,7 +1089,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
while
(
1
)
{
int32_t
index
=
0
;
SStrToken
sToken
=
tStrGetToken
(
str
,
&
index
,
false
,
0
,
NULL
);
SStrToken
sToken
=
tStrGetToken
(
str
,
&
index
,
false
);
// no data in the sql string anymore.
if
(
sToken
.
n
==
0
)
{
...
...
@@ -1108,7 +1114,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
pCmd
->
curSql
=
sToken
.
z
;
char
buf
[
TSDB_TABLE_FNAME_LEN
];
char
buf
[
TSDB_TABLE_FNAME_LEN
];
SStrToken
sTblToken
;
sTblToken
.
z
=
buf
;
// Check if the table name available or not
...
...
@@ -1121,7 +1127,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_clean
;
}
if
((
code
=
tscCheckIfCreateTable
(
&
str
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
char
*
bindedColumns
=
NULL
;
if
((
code
=
tscCheckIfCreateTable
(
&
str
,
pSql
,
&
bindedColumns
))
!=
TSDB_CODE_SUCCESS
)
{
/*
* After retrieving the table meta from server, the sql string will be parsed from the paused position.
* And during the getTableMetaCallback function, the sql string will be parsed from the paused position.
...
...
@@ -1129,7 +1136,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if
(
TSDB_CODE_TSC_ACTION_IN_PROGRESS
==
code
)
{
return
code
;
}
tscError
(
"%p async insert parse error, code:%s"
,
pSql
,
tstrerror
(
code
));
pCmd
->
curSql
=
NULL
;
goto
_clean
;
...
...
@@ -1141,41 +1148,22 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
index
=
0
;
sToken
=
tStrGetToken
(
str
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
str
,
&
index
,
false
);
str
+=
index
;
if
(
sToken
.
n
==
0
)
{
if
(
sToken
.
n
==
0
||
(
sToken
.
type
!=
TK_FILE
&&
sToken
.
type
!=
TK_VALUES
)
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword VALUES or FILE required"
,
sToken
.
z
);
goto
_clean
;
}
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
if
(
sToken
.
type
==
TK_VALUES
)
{
SParsedDataColInfo
spd
=
{.
numOfCols
=
tinfo
.
numOfColumns
};
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
);
tscSetAssignedColumnInfo
(
&
spd
,
pSchema
,
tinfo
.
numOfColumns
);
if
(
validateDataSource
(
pCmd
,
DATA_FROM_SQL_STRING
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
/*
* app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines
*/
code
=
doParseInsertStatement
(
pCmd
,
&
str
,
&
spd
,
&
totalNum
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
}
else
if
(
sToken
.
type
==
TK_FILE
)
{
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
if
(
sToken
.
type
==
TK_FILE
)
{
if
(
validateDataSource
(
pCmd
,
DATA_FROM_DATA_FILE
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
index
=
0
;
sToken
=
tStrGetToken
(
str
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
str
,
&
index
,
false
);
if
(
sToken
.
type
!=
TK_STRING
&&
sToken
.
type
!=
TK_ID
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"file path is required following keyword FILE"
,
sToken
.
z
);
goto
_clean
;
...
...
@@ -1199,83 +1187,63 @@ int tsParseInsertSql(SSqlObj *pSql) {
tstrncpy
(
pCmd
->
payload
,
full_path
.
we_wordv
[
0
],
pCmd
->
allocSize
);
wordfree
(
&
full_path
);
}
else
if
(
sToken
.
type
==
TK_LP
)
{
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
STableMeta
*
pTableMeta
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
)
->
pTableMeta
;
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
if
(
validateDataSource
(
pCmd
,
DATA_FROM_SQL_STRING
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
SParsedDataColInfo
spd
=
{
0
};
spd
.
numOfCols
=
tinfo
.
numOfColumns
;
int16_t
offset
[
TSDB_MAX_COLUMNS
]
=
{
0
};
for
(
int32_t
t
=
1
;
t
<
tinfo
.
numOfColumns
;
++
t
)
{
offset
[
t
]
=
offset
[
t
-
1
]
+
pSchema
[
t
-
1
].
bytes
;
}
while
(
1
)
{
index
=
0
;
sToken
=
tStrGetToken
(
str
,
&
index
,
false
,
0
,
NULL
);
str
+=
index
;
}
else
{
if
(
bindedColumns
==
NULL
)
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
TK_STRING
==
sToken
.
type
)
{
tscDequoteAndTrimToken
(
&
sToken
)
;
if
(
validateDataSource
(
pCmd
,
DATA_FROM_SQL_STRING
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
if
(
sToken
.
type
==
TK_RP
)
{
break
;
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
dataBuf
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
bool
findColumnIndex
=
false
;
// todo speedup by using hash list
for
(
int32_t
t
=
0
;
t
<
tinfo
.
numOfColumns
;
++
t
)
{
if
(
strncmp
(
sToken
.
z
,
pSchema
[
t
].
name
,
sToken
.
n
)
==
0
&&
strlen
(
pSchema
[
t
].
name
)
==
sToken
.
n
)
{
SParsedColElem
*
pElem
=
&
spd
.
elems
[
spd
.
numOfAssignedCols
++
];
pElem
->
offset
=
offset
[
t
];
pElem
->
colIndex
=
t
;
if
(
spd
.
hasVal
[
t
]
==
true
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"duplicated column name"
,
sToken
.
z
);
goto
_clean
;
}
code
=
doParseInsertStatement
(
pCmd
,
&
str
,
dataBuf
,
&
totalNum
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
}
else
{
// bindedColumns != NULL
// insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta
*
pTableMeta
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
)
->
pTableMeta
;
spd
.
hasVal
[
t
]
=
true
;
findColumnIndex
=
true
;
break
;
}
if
(
validateDataSource
(
pCmd
,
DATA_FROM_SQL_STRING
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
if
(
!
findColumnIndex
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"invalid column name"
,
sToken
.
z
);
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
dataBuf
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
}
if
(
spd
.
numOfAssignedCols
==
0
||
spd
.
numOfAssignedCols
>
tinfo
.
numOfColumns
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"column name expected"
,
sToken
.
z
);
goto
_clean
;
}
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
code
=
parseBoundColumns
(
pCmd
,
&
dataBuf
->
boundColumnInfo
,
pSchema
,
bindedColumns
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
index
=
0
;
sToken
=
tStrGetToken
(
str
,
&
index
,
false
,
0
,
NULL
);
str
+=
index
;
if
(
dataBuf
->
boundColumnInfo
.
cols
[
0
].
hasVal
==
false
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"primary timestamp column can not be null"
,
NULL
);
goto
_clean
;
}
if
(
sToken
.
type
!=
TK_VALUES
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword VALUES is expected"
,
sToken
.
z
);
goto
_clean
;
}
if
(
sToken
.
type
!=
TK_VALUES
)
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword VALUES is expected"
,
sToken
.
z
);
goto
_clean
;
}
code
=
doParseInsertStatement
(
pCmd
,
&
str
,
&
spd
,
&
totalNum
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
code
=
doParseInsertStatement
(
pCmd
,
&
str
,
dataBuf
,
&
totalNum
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
}
}
else
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword VALUES or FILE are required"
,
sToken
.
z
);
goto
_clean
;
}
}
...
...
@@ -1294,7 +1262,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_clean
;
_clean:
pCmd
->
curSql
=
NULL
;
pCmd
->
curSql
=
NULL
;
pCmd
->
parseFinished
=
1
;
return
code
;
}
...
...
@@ -1307,7 +1275,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
int32_t
index
=
0
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SStrToken
sToken
=
tStrGetToken
(
pSql
->
sqlstr
,
&
index
,
false
,
0
,
NULL
);
SStrToken
sToken
=
tStrGetToken
(
pSql
->
sqlstr
,
&
index
,
false
);
assert
(
sToken
.
type
==
TK_INSERT
||
sToken
.
type
==
TK_IMPORT
);
pCmd
->
count
=
0
;
...
...
@@ -1317,7 +1285,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
|
pCmd
->
insertType
);
sToken
=
tStrGetToken
(
pSql
->
sqlstr
,
&
index
,
false
,
0
,
NULL
);
sToken
=
tStrGetToken
(
pSql
->
sqlstr
,
&
index
,
false
);
if
(
sToken
.
type
!=
TK_INTO
)
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword INTO is expected"
,
sToken
.
z
);
}
...
...
@@ -1450,13 +1418,10 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
SParsedDataColInfo
spd
=
{.
numOfCols
=
tinfo
.
numOfColumns
};
tscSetAssignedColumnInfo
(
&
spd
,
pSchema
,
tinfo
.
numOfColumns
);
destroyTableNameList
(
pCmd
);
tfree
(
pCmd
->
pTableNameList
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
...
...
@@ -1495,8 +1460,9 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char
*
lineptr
=
line
;
strtolower
(
line
,
line
);
int32_t
len
=
tsParseOneRowData
(
&
lineptr
,
pTableDataBlock
,
pSchema
,
&
spd
,
pCmd
,
tinfo
.
precision
,
&
code
,
tokenBuf
);
if
(
len
<=
0
||
pTableDataBlock
->
numOfParams
>
0
)
{
int32_t
len
=
0
;
code
=
tsParseOneRow
(
&
lineptr
,
pTableDataBlock
,
pCmd
,
tinfo
.
precision
,
&
len
,
tokenBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pTableDataBlock
->
numOfParams
>
0
)
{
pSql
->
res
.
code
=
code
;
break
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
15da33e0
...
...
@@ -415,6 +415,20 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree
(
pCmd
->
pQueryInfo
);
}
void
destroyTableNameList
(
SSqlCmd
*
pCmd
)
{
if
(
pCmd
->
numOfTables
==
0
)
{
assert
(
pCmd
->
pTableNameList
==
NULL
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
pCmd
->
numOfTables
;
++
i
)
{
tfree
(
pCmd
->
pTableNameList
[
i
]);
}
pCmd
->
numOfTables
=
0
;
tfree
(
pCmd
->
pTableNameList
);
}
void
tscResetSqlCmd
(
SSqlCmd
*
pCmd
,
bool
removeMeta
)
{
pCmd
->
command
=
0
;
pCmd
->
numOfCols
=
0
;
...
...
@@ -424,14 +438,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd
->
parseFinished
=
0
;
pCmd
->
autoCreated
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCmd
->
numOfTables
;
++
i
)
{
if
(
pCmd
->
pTableNameList
&&
pCmd
->
pTableNameList
[
i
])
{
tfree
(
pCmd
->
pTableNameList
[
i
]);
}
}
pCmd
->
numOfTables
=
0
;
tfree
(
pCmd
->
pTableNameList
);
destroyTableNameList
(
pCmd
);
pCmd
->
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
pTableBlockHashList
,
removeMeta
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
...
...
@@ -548,6 +555,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free
(
pSql
);
}
void
tscDestroyBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
)
{
tfree
(
pColInfo
->
boundedColumns
);
tfree
(
pColInfo
->
cols
);
}
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
...
...
@@ -568,6 +580,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove
(
tscTableMetaInfo
,
name
,
strnlen
(
name
,
TSDB_TABLE_FNAME_LEN
));
}
tscDestroyBoundColumnInfo
(
&
pDataBlock
->
boundColumnInfo
);
tfree
(
pDataBlock
);
}
...
...
@@ -678,7 +691,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* @param dataBlocks
* @return
*/
int32_t
tscCreateDataBlock
(
size_t
initial
Size
,
int32_t
rowSize
,
int32_t
startOffset
,
SName
*
name
,
int32_t
tscCreateDataBlock
(
size_t
default
Size
,
int32_t
rowSize
,
int32_t
startOffset
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
)
{
STableDataBlocks
*
dataBuf
=
(
STableDataBlocks
*
)
calloc
(
1
,
sizeof
(
STableDataBlocks
));
if
(
dataBuf
==
NULL
)
{
...
...
@@ -686,10 +699,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
dataBuf
->
nAllocSize
=
(
uint32_t
)
initialSize
;
dataBuf
->
headerSize
=
startOffset
;
// the header size will always be the startOffset value, reserved for the subumit block header
dataBuf
->
nAllocSize
=
(
uint32_t
)
defaultSize
;
dataBuf
->
headerSize
=
startOffset
;
// the header size will always be the startOffset value, reserved for the subumit block header
if
(
dataBuf
->
nAllocSize
<=
dataBuf
->
headerSize
)
{
dataBuf
->
nAllocSize
=
dataBuf
->
headerSize
*
2
;
dataBuf
->
nAllocSize
=
dataBuf
->
headerSize
*
2
;
}
dataBuf
->
pData
=
calloc
(
1
,
dataBuf
->
nAllocSize
);
...
...
@@ -699,25 +714,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
dataBuf
->
ordered
=
true
;
dataBuf
->
prevTS
=
INT64_MIN
;
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf
->
pTableMeta
=
tscTableMetaDup
(
pTableMeta
);
SParsedDataColInfo
*
pColInfo
=
&
dataBuf
->
boundColumnInfo
;
SSchema
*
pSchema
=
tscGetTableSchema
(
dataBuf
->
pTableMeta
);
tscSetBoundColumnInfo
(
pColInfo
,
pSchema
,
dataBuf
->
pTableMeta
->
tableInfo
.
numOfColumns
);
dataBuf
->
rowSize
=
rowSize
;
dataBuf
->
size
=
startOffset
;
dataBuf
->
ordered
=
true
;
dataBuf
->
prevTS
=
INT64_MIN
;
dataBuf
->
rowSize
=
rowSize
;
dataBuf
->
size
=
startOffset
;
dataBuf
->
tsSource
=
-
1
;
dataBuf
->
vgId
=
dataBuf
->
pTableMeta
->
vgId
;
tNameAssign
(
&
dataBuf
->
tableName
,
name
);
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf
->
pTableMeta
=
tscTableMetaDup
(
pTableMeta
);
assert
(
initialSize
>
0
&&
pTableMeta
!=
NULL
&&
dataBuf
->
pTableMeta
!=
NULL
);
assert
(
defaultSize
>
0
&&
pTableMeta
!=
NULL
&&
dataBuf
->
pTableMeta
!=
NULL
);
*
dataBlocks
=
dataBuf
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
tscGetDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
)
{
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
)
{
*
dataBlocks
=
NULL
;
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
id
));
if
(
t1
!=
NULL
)
{
...
...
@@ -826,6 +847,8 @@ static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
int32_t
i
=
0
;
while
(
p1
)
{
STableDataBlocks
*
pBlocks
=
*
p1
;
tfree
(
pCmd
->
pTableNameList
[
i
]);
pCmd
->
pTableNameList
[
i
++
]
=
tNameDup
(
&
pBlocks
->
tableName
);
p1
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p1
);
}
...
...
@@ -942,7 +965,7 @@ bool tscIsInsertData(char* sqlstr) {
int32_t
index
=
0
;
do
{
SStrToken
t0
=
tStrGetToken
(
sqlstr
,
&
index
,
false
,
0
,
NULL
);
SStrToken
t0
=
tStrGetToken
(
sqlstr
,
&
index
,
false
);
if
(
t0
.
type
!=
TK_LP
)
{
return
t0
.
type
==
TK_INSERT
||
t0
.
type
==
TK_IMPORT
;
}
...
...
src/query/src/qTokenizer.c
浏览文件 @
15da33e0
...
...
@@ -560,7 +560,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
return
0
;
}
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
,
uint32_t
numOfIgnoreToken
,
uint32_t
*
ignoreTokenTypes
)
{
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
)
{
SStrToken
t0
=
{
0
};
// here we reach the end of sql string, null-terminated string
...
...
@@ -585,7 +585,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
}
t0
.
n
=
tSQLGetToken
(
&
str
[
*
i
],
&
t0
.
type
);
break
;
// not support user specfied ignored symbol list
#if 0
bool ignore = false;
for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
if (t0.type == ignoreTokenTypes[k]) {
...
...
@@ -597,6 +600,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
if (!ignore) {
break;
}
#endif
}
if
(
t0
.
type
==
TK_SEMI
)
{
...
...
src/query/tests/resultBufferTest.cpp
浏览文件 @
15da33e0
...
...
@@ -10,7 +10,7 @@ namespace {
// simple test
void
simpleTest
()
{
SDiskbasedResultBuf
*
pResultBuf
=
NULL
;
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4096
,
NULL
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4096
,
1
);
int32_t
pageId
=
0
;
int32_t
groupId
=
0
;
...
...
@@ -52,7 +52,7 @@ void simpleTest() {
void
writeDownTest
()
{
SDiskbasedResultBuf
*
pResultBuf
=
NULL
;
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4
*
1024
,
NULL
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4
*
1024
,
1
);
int32_t
pageId
=
0
;
int32_t
writePageId
=
0
;
...
...
@@ -99,7 +99,7 @@ void writeDownTest() {
void
recyclePageTest
()
{
SDiskbasedResultBuf
*
pResultBuf
=
NULL
;
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4
*
1024
,
NULL
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1024
,
4
*
1024
,
1
);
int32_t
pageId
=
0
;
int32_t
writePageId
=
0
;
...
...
src/util/inc/tstoken.h
浏览文件 @
15da33e0
...
...
@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType);
* @param str
* @param i
* @param isPrevOptr
* @param numOfIgnoreToken
* @param ignoreTokenTypes
* @return
*/
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
,
uint32_t
numOfIgnoreToken
,
uint32_t
*
ignoreTokenTypes
);
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
);
/**
* check if it is a keyword or not
...
...
tests/script/general/parser/alter.sim
浏览文件 @
15da33e0
...
...
@@ -204,7 +204,13 @@ if $data03 != NULL then
return -1
endi
sql reset query cache
print ============================>TD-3366 TD-3486
sql insert into td_3366(ts, c3, c1) using mt(t1) tags(911) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into td_3486(ts, c3, c1) using mt(t1) tags(-12) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into ttxu(ts, c3, c1) using mt(t1) tags('-121') values('2018-1-1 11:11:11', 'new1', 12);
sql insert into tb(ts, c1, c3) using mt(t1) tags(123) values('2018-11-01 16:29:58.000', 2, 'port')
sql insert into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:39:58.000', 2, 'import', 3)
...
...
@@ -212,6 +218,7 @@ sql select * from tb order by ts desc
if $rows != 4 then
return -1
endi
if $data03 != 3 then
return -1
endi
...
...
@@ -233,10 +240,10 @@ sql_error alter table mt add column c1 int
# drop non-existing columns
sql_error alter table mt drop column c9
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
#
sql drop database $db
#
sql show databases
#if $rows != 0 then
#
return -1
#
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/general/parser/gendata.sh
0 → 100755
浏览文件 @
15da33e0
#!/bin/bash
Cur_Dir
=
$(
pwd
)
echo
$Cur_Dir
echo
"'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'"
>>
~/data.sql
tests/script/general/parser/import_file.sim
浏览文件 @
15da33e0
...
...
@@ -8,32 +8,28 @@ sql connect
sleep 500
sql drop database if exists indb
sql create database if not exists indb
sql use indb
$inFileName = '~/data.csv'
$numOfRows = 10000
#system sh/gendata.sh $inFileName $numOfRows # input file invalid
system sh/gendata.sh ~/data.csv $numOfRows
system general/parser/gendata.sh
sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2))
print ====== create tables success, starting import data
sql import into tbx file
$inFileName
sql import into tbx file
'~/data.sql'
sql select count(*) from tbx
if $rows != 1 then
return -1
endi
if $data00 != $numOfRows then
print "expect: $numOfRows, act: $data00"
return -1
endi
#if $data00 != $numOfRows then
# print "expect: $numOfRows, act: $data00"
# return -1
#endi
#system rm -f $inFileName # invalid shell
system rm -f ~/data.csv
system rm -f ~/data.sql
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录