Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
25d6bbaf
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
25d6bbaf
编写于
10月 18, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17409 from taosdata/feature/TD-19221-3.0
feat:add new interface for schemaless
上级
3cbfd2d1
bce4fc9c
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
226 addition
and
91 deletion
+226
-91
include/client/taos.h
include/client/taos.h
+1
-0
source/client/src/clientSml.c
source/client/src/clientSml.c
+153
-77
source/client/test/smlTest.cpp
source/client/test/smlTest.cpp
+14
-14
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+58
-0
未找到文件。
include/client/taos.h
浏览文件 @
25d6bbaf
...
...
@@ -198,6 +198,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert_raw
(
TAOS
*
taos
,
char
*
lines
,
int
len
,
int32_t
*
totalRows
,
int
protocol
,
int
precision
);
/* --------------------------TMQ INTERFACE------------------------------- */
...
...
source/client/src/clientSml.c
浏览文件 @
25d6bbaf
...
...
@@ -28,8 +28,8 @@
#define QUOTE '"'
#define SLASH '\\'
#define JUMP_SPACE(sql) \
while (
*sql != '\0'
) { \
#define JUMP_SPACE(sql
, sqlEnd
) \
while (
sql < sqlEnd
) { \
if (*sql == SPACE) \
sql++; \
else \
...
...
@@ -917,16 +917,17 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
static
int32_t
smlParseInfluxString
(
const
char
*
sql
,
SSmlLineInfo
*
elements
,
SSmlMsgBuf
*
msg
)
{
static
int32_t
smlParseInfluxString
(
const
char
*
sql
,
const
char
*
sqlEnd
,
SSmlLineInfo
*
elements
,
SSmlMsgBuf
*
msg
)
{
if
(
!
sql
)
return
TSDB_CODE_SML_INVALID_DATA
;
JUMP_SPACE
(
sql
)
JUMP_SPACE
(
sql
,
sqlEnd
)
if
(
*
sql
==
COMMA
)
return
TSDB_CODE_SML_INVALID_DATA
;
elements
->
measure
=
sql
;
// parse measure
while
(
*
sql
!=
'\0'
)
{
while
(
sql
<
sqlEnd
)
{
if
((
sql
!=
elements
->
measure
)
&&
IS_SLASH_LETTER
(
sql
))
{
MOVE_FORWARD_ONE
(
sql
,
strlen
(
sql
)
+
1
);
MOVE_FORWARD_ONE
(
sql
,
sqlEnd
-
sql
);
sqlEnd
--
;
continue
;
}
if
(
IS_COMMA
(
sql
))
{
...
...
@@ -950,7 +951,7 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
}
else
{
if
(
*
sql
==
COMMA
)
sql
++
;
elements
->
tags
=
sql
;
while
(
*
sql
!=
'\0'
)
{
while
(
sql
<
sqlEnd
)
{
if
(
IS_SPACE
(
sql
))
{
break
;
}
...
...
@@ -961,10 +962,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
elements
->
measureTagsLen
=
sql
-
elements
->
measure
;
// parse cols
JUMP_SPACE
(
sql
)
JUMP_SPACE
(
sql
,
sqlEnd
)
elements
->
cols
=
sql
;
bool
isInQuote
=
false
;
while
(
*
sql
!=
'\0'
)
{
while
(
sql
<
sqlEnd
)
{
if
(
IS_QUOTE
(
sql
))
{
isInQuote
=
!
isInQuote
;
}
...
...
@@ -984,10 +985,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
}
// parse timestamp
JUMP_SPACE
(
sql
)
JUMP_SPACE
(
sql
,
sqlEnd
)
elements
->
timestamp
=
sql
;
while
(
*
sql
!=
'\0'
)
{
if
(
*
sql
==
SPACE
)
{
while
(
sql
<
sqlEnd
)
{
if
(
isspace
(
*
sql
)
)
{
break
;
}
sql
++
;
...
...
@@ -997,8 +998,8 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
return
TSDB_CODE_SUCCESS
;
}
static
void
smlParseTelnetElement
(
const
char
**
sql
,
const
char
**
data
,
int32_t
*
len
)
{
while
(
*
*
sql
!=
'\0'
)
{
static
void
smlParseTelnetElement
(
const
char
**
sql
,
const
char
*
sqlEnd
,
const
char
*
*
data
,
int32_t
*
len
)
{
while
(
*
sql
<
sqlEnd
)
{
if
(
**
sql
!=
SPACE
&&
!
(
*
data
))
{
*
data
=
*
sql
;
}
else
if
(
**
sql
==
SPACE
&&
*
data
)
{
...
...
@@ -1009,20 +1010,20 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
}
}
static
int32_t
smlParseTelnetTags
(
const
char
*
data
,
SArray
*
cols
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
static
int32_t
smlParseTelnetTags
(
const
char
*
data
,
const
char
*
sqlEnd
,
SArray
*
cols
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
const
char
*
sql
=
data
;
size_t
childTableNameLen
=
strlen
(
tsSmlChildTableName
);
while
(
*
sql
!=
'\0'
)
{
JUMP_SPACE
(
sql
)
while
(
sql
<
sqlEnd
)
{
JUMP_SPACE
(
sql
,
sqlEnd
)
if
(
*
sql
==
'\0'
)
break
;
const
char
*
key
=
sql
;
int32_t
keyLen
=
0
;
// parse key
while
(
*
sql
!=
'\0'
)
{
while
(
sql
<
sqlEnd
)
{
if
(
*
sql
==
SPACE
)
{
smlBuildInvalidDataMsg
(
msg
,
"invalid data"
,
sql
);
return
TSDB_CODE_SML_INVALID_DATA
;
...
...
@@ -1047,7 +1048,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
// parse value
const
char
*
value
=
sql
;
int32_t
valueLen
=
0
;
while
(
*
sql
!=
'\0'
)
{
while
(
sql
<
sqlEnd
)
{
// parse value
if
(
*
sql
==
SPACE
)
{
break
;
...
...
@@ -1092,11 +1093,11 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static
int32_t
smlParseTelnetString
(
SSmlHandle
*
info
,
const
char
*
sql
,
SSmlTableInfo
*
tinfo
,
SArray
*
cols
)
{
static
int32_t
smlParseTelnetString
(
SSmlHandle
*
info
,
const
char
*
sql
,
const
char
*
sqlEnd
,
SSmlTableInfo
*
tinfo
,
SArray
*
cols
)
{
if
(
!
sql
)
return
TSDB_CODE_SML_INVALID_DATA
;
// parse metric
smlParseTelnetElement
(
&
sql
,
&
tinfo
->
sTableName
,
&
tinfo
->
sTableNameLen
);
smlParseTelnetElement
(
&
sql
,
sqlEnd
,
&
tinfo
->
sTableName
,
&
tinfo
->
sTableNameLen
);
if
(
!
(
tinfo
->
sTableName
)
||
IS_INVALID_TABLE_LEN
(
tinfo
->
sTableNameLen
))
{
smlBuildInvalidDataMsg
(
&
info
->
msgBuf
,
"invalid data"
,
sql
);
return
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
...
...
@@ -1105,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
// parse timestamp
const
char
*
timestamp
=
NULL
;
int32_t
tLen
=
0
;
smlParseTelnetElement
(
&
sql
,
&
timestamp
,
&
tLen
);
smlParseTelnetElement
(
&
sql
,
sqlEnd
,
&
timestamp
,
&
tLen
);
if
(
!
timestamp
||
tLen
==
0
)
{
smlBuildInvalidDataMsg
(
&
info
->
msgBuf
,
"invalid timestamp"
,
sql
);
return
TSDB_CODE_SML_INVALID_DATA
;
...
...
@@ -1120,7 +1121,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
// parse value
const
char
*
value
=
NULL
;
int32_t
valueLen
=
0
;
smlParseTelnetElement
(
&
sql
,
&
value
,
&
valueLen
);
smlParseTelnetElement
(
&
sql
,
sqlEnd
,
&
value
,
&
valueLen
);
if
(
!
value
||
valueLen
==
0
)
{
smlBuildInvalidDataMsg
(
&
info
->
msgBuf
,
"invalid value"
,
sql
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
...
...
@@ -1138,7 +1139,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
}
// parse tags
ret
=
smlParseTelnetTags
(
sql
,
tinfo
->
tags
,
tinfo
->
childTableName
,
info
->
dumplicateKey
,
&
info
->
msgBuf
);
ret
=
smlParseTelnetTags
(
sql
,
sqlEnd
,
tinfo
->
tags
,
tinfo
->
childTableName
,
info
->
dumplicateKey
,
&
info
->
msgBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
smlBuildInvalidDataMsg
(
&
info
->
msgBuf
,
"invalid data"
,
sql
);
return
ret
;
...
...
@@ -2073,11 +2074,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
}
/************* TSDB_SML_JSON_PROTOCOL function end **************/
static
int32_t
smlParseInfluxLine
(
SSmlHandle
*
info
,
const
char
*
sql
)
{
static
int32_t
smlParseInfluxLine
(
SSmlHandle
*
info
,
const
char
*
sql
,
const
int
len
)
{
SSmlLineInfo
elements
=
{
0
};
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxLine sql:%s, hello"
,
info
->
id
,
sql
);
int
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
info
->
msgBuf
);
int
ret
=
smlParseInfluxString
(
sql
,
sql
+
len
,
&
elements
,
&
info
->
msgBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlParseInfluxLine failed"
,
info
->
id
);
return
ret
;
...
...
@@ -2184,7 +2185,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
smlParseTelnetLine
(
SSmlHandle
*
info
,
void
*
data
)
{
static
int32_t
smlParseTelnetLine
(
SSmlHandle
*
info
,
void
*
data
,
const
int
len
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
SSmlTableInfo
*
tinfo
=
smlBuildTableInfo
();
if
(
!
tinfo
)
{
...
...
@@ -2198,7 +2199,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
}
if
(
info
->
protocol
==
TSDB_SML_TELNET_PROTOCOL
)
{
ret
=
smlParseTelnetString
(
info
,
(
const
char
*
)
data
,
tinfo
,
cols
);
ret
=
smlParseTelnetString
(
info
,
(
const
char
*
)
data
,
(
char
*
)
data
+
len
,
tinfo
,
cols
);
}
else
if
(
info
->
protocol
==
TSDB_SML_JSON_PROTOCOL
)
{
ret
=
smlParseJSONString
(
info
,
(
cJSON
*
)
data
,
tinfo
,
cols
);
}
else
{
...
...
@@ -2289,7 +2290,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
for
(
int32_t
i
=
0
;
i
<
payloadNum
;
++
i
)
{
cJSON
*
dataPoint
=
(
payloadNum
==
1
&&
cJSON_IsObject
(
root
))
?
root
:
cJSON_GetArrayItem
(
root
,
i
);
ret
=
smlParseTelnetLine
(
info
,
dataPoint
);
ret
=
smlParseTelnetLine
(
info
,
dataPoint
,
-
1
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" Invalid JSON Payload"
,
info
->
id
);
goto
end
;
...
...
@@ -2378,10 +2379,14 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
info
->
cost
.
endTime
-
info
->
cost
.
insertRpcTime
,
info
->
cost
.
endTime
-
info
->
cost
.
parseTime
);
}
static
int32_t
smlParseLine
(
SSmlHandle
*
info
,
char
*
lines
[],
int
numLines
)
{
static
int32_t
smlParseLine
(
SSmlHandle
*
info
,
char
*
lines
[],
char
*
rawLine
,
char
*
rawLineEnd
,
int
numLines
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
info
->
protocol
==
TSDB_SML_JSON_PROTOCOL
)
{
code
=
smlParseJSON
(
info
,
*
lines
);
if
(
lines
){
code
=
smlParseJSON
(
info
,
*
lines
);
}
else
if
(
rawLine
){
code
=
smlParseJSON
(
info
,
rawLine
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlParseJSON failed:%s"
,
info
->
id
,
*
lines
);
return
code
;
...
...
@@ -2390,28 +2395,46 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
}
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
char
*
tmp
=
NULL
;
int
len
=
0
;
if
(
lines
){
tmp
=
lines
[
i
];
len
=
strlen
(
tmp
);
}
else
if
(
rawLine
){
tmp
=
rawLine
;
while
(
rawLine
<
rawLineEnd
){
if
(
*
(
rawLine
++
)
==
'\n'
){
break
;
}
len
++
;
}
if
(
info
->
protocol
==
TSDB_SML_LINE_PROTOCOL
&&
tmp
[
0
]
==
'#'
){
// this line is comment
continue
;
}
}
if
(
info
->
protocol
==
TSDB_SML_LINE_PROTOCOL
)
{
code
=
smlParseInfluxLine
(
info
,
lines
[
i
]
);
code
=
smlParseInfluxLine
(
info
,
tmp
,
len
);
}
else
if
(
info
->
protocol
==
TSDB_SML_TELNET_PROTOCOL
)
{
code
=
smlParseTelnetLine
(
info
,
lines
[
i
]
);
code
=
smlParseTelnetLine
(
info
,
tmp
,
len
);
}
else
{
ASSERT
(
0
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlParseLine failed. line %d : %s"
,
info
->
id
,
i
,
lines
[
i
]
);
uError
(
"SML:0x%"
PRIx64
" smlParseLine failed. line %d : %s"
,
info
->
id
,
i
,
tmp
);
return
code
;
}
}
return
code
;
}
static
int
smlProcess
(
SSmlHandle
*
info
,
char
*
lines
[],
int
numLines
)
{
static
int
smlProcess
(
SSmlHandle
*
info
,
char
*
lines
[],
char
*
rawLine
,
char
*
rawLineEnd
,
int
numLines
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
retryNum
=
0
;
info
->
cost
.
parseTime
=
taosGetTimestampUs
();
code
=
smlParseLine
(
info
,
lines
,
numLines
);
code
=
smlParseLine
(
info
,
lines
,
rawLine
,
rawLineEnd
,
numLines
);
if
(
code
!=
0
)
{
uError
(
"SML:0x%"
PRIx64
" smlParseLine error : %s"
,
info
->
id
,
tstrerror
(
code
));
return
code
;
...
...
@@ -2504,39 +2527,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
smlDestroyInfo
(
info
);
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
*
* @param $lines input array may contain multiple lines, each line indicates a data point.
* If protocol=2 is used input array should contain single JSON
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
* multiple data points in JSON format, should include them in $JSON_string
* as a JSON array.
* @param $numLines indicates how many data points in $lines.
* If protocol = 2 is used this param will be ignored as $lines should
* contain single JSON string.
* @param $protocol indicates which protocol to use for parsing:
* 0 - influxDB line protocol
* 1 - OpenTSDB telnet line protocol
* 2 - OpenTSDB JSON format protocol
* @return return zero for successful insertion. Otherwise return none-zero error code of
* failure reason.
*
*/
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
if
(
NULL
==
taos
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
NULL
;
}
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
request
)
{
uError
(
"SML:taos_schemaless_insert error request is null"
);
return
NULL
;
}
TAOS_RES
*
taos_schemaless_insert_inner
(
SRequestObj
*
request
,
char
*
lines
[],
char
*
rawLine
,
char
*
rawLineEnd
,
int
numLines
,
int
protocol
,
int
precision
)
{
int
batchs
=
0
;
STscObj
*
pTscObj
=
request
->
pTscObj
;
...
...
@@ -2560,12 +2552,6 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
goto
end
;
}
if
(
!
lines
)
{
request
->
code
=
TSDB_CODE_SML_INVALID_DATA
;
smlBuildInvalidDataMsg
(
&
msg
,
"lines is null"
,
NULL
);
goto
end
;
}
if
(
protocol
<
TSDB_SML_LINE_PROTOCOL
||
protocol
>
TSDB_SML_JSON_PROTOCOL
)
{
request
->
code
=
TSDB_CODE_SML_INVALID_PROTOCOL_TYPE
;
smlBuildInvalidDataMsg
(
&
msg
,
"protocol invalidate"
,
NULL
);
...
...
@@ -2616,15 +2602,28 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
info
->
affectedRows
=
perBatch
;
info
->
pRequest
->
body
.
queryFp
=
smlInsertCallback
;
info
->
pRequest
->
body
.
param
=
info
;
int32_t
code
=
smlProcess
(
info
,
lines
,
perBatch
);
lines
+=
perBatch
;
int32_t
code
=
smlProcess
(
info
,
lines
,
rawLine
,
rawLineEnd
,
perBatch
);
if
(
lines
){
lines
+=
perBatch
;
}
if
(
rawLine
){
int
num
=
0
;
while
(
rawLine
<
rawLineEnd
){
if
(
*
(
rawLine
++
)
==
'\n'
){
num
++
;
}
if
(
num
==
perBatch
){
break
;
}
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
info
->
pRequest
->
body
.
queryFp
(
info
,
req
,
code
);
}
}
tsem_wait
(
&
params
.
sem
);
end:
end:
taosThreadSpinDestroy
(
&
params
.
lock
);
tsem_destroy
(
&
params
.
sem
);
// ((STscObj *)taos)->schemalessType = 0;
...
...
@@ -2632,3 +2631,80 @@ end:
uDebug
(
"resultend:%s"
,
request
->
msgBuf
);
return
(
TAOS_RES
*
)
request
;
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
*
* @param $lines input array may contain multiple lines, each line indicates a data point.
* If protocol=2 is used input array should contain single JSON
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
* multiple data points in JSON format, should include them in $JSON_string
* as a JSON array.
* @param $numLines indicates how many data points in $lines.
* If protocol = 2 is used this param will be ignored as $lines should
* contain single JSON string.
* @param $protocol indicates which protocol to use for parsing:
* 0 - influxDB line protocol
* 1 - OpenTSDB telnet line protocol
* 2 - OpenTSDB JSON format protocol
* @return return zero for successful insertion. Otherwise return none-zero error code of
* failure reason.
*
*/
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
if
(
NULL
==
taos
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
NULL
;
}
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
request
)
{
uError
(
"SML:taos_schemaless_insert error request is null"
);
return
NULL
;
}
if
(
!
lines
)
{
SSmlMsgBuf
msg
=
{
ERROR_MSG_BUF_DEFAULT_SIZE
,
request
->
msgBuf
};
request
->
code
=
TSDB_CODE_SML_INVALID_DATA
;
smlBuildInvalidDataMsg
(
&
msg
,
"lines is null"
,
NULL
);
return
(
TAOS_RES
*
)
request
;
}
return
taos_schemaless_insert_inner
(
request
,
lines
,
NULL
,
NULL
,
numLines
,
protocol
,
precision
);
}
TAOS_RES
*
taos_schemaless_insert_raw
(
TAOS
*
taos
,
char
*
lines
,
int
len
,
int32_t
*
totalRows
,
int
protocol
,
int
precision
){
if
(
NULL
==
taos
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
NULL
;
}
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
request
)
{
uError
(
"SML:taos_schemaless_insert error request is null"
);
return
NULL
;
}
if
(
!
lines
||
len
<=
0
)
{
SSmlMsgBuf
msg
=
{
ERROR_MSG_BUF_DEFAULT_SIZE
,
request
->
msgBuf
};
request
->
code
=
TSDB_CODE_SML_INVALID_DATA
;
smlBuildInvalidDataMsg
(
&
msg
,
"lines is null"
,
NULL
);
return
(
TAOS_RES
*
)
request
;
}
int
numLines
=
0
;
*
totalRows
=
0
;
char
*
tmp
=
lines
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
lines
[
i
]
==
'\n'
||
i
==
len
-
1
){
numLines
++
;
if
(
tmp
[
0
]
!=
'#'
||
protocol
!=
TSDB_SML_LINE_PROTOCOL
){
//ignore comment
(
*
totalRows
)
++
;
}
tmp
=
lines
+
i
+
1
;
}
}
return
taos_schemaless_insert_inner
(
request
,
NULL
,
lines
,
lines
+
len
,
numLines
,
protocol
,
precision
);
}
source/client/test/smlTest.cpp
浏览文件 @
25d6bbaf
...
...
@@ -44,7 +44,7 @@ TEST(testCase, smlParseInfluxString_Test) {
char
*
tmp
=
"
\\
,st,t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000 ,32,c=3"
;
char
*
sql
=
(
char
*
)
taosMemoryCalloc
(
256
,
1
);
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
int
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
int
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
);
ASSERT_EQ
(
elements
.
measureLen
,
strlen
(
",st"
));
...
...
@@ -63,14 +63,14 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp
=
"st,t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_NE
(
ret
,
0
);
// case 3 false
tmp
=
"st, t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
cols
,
sql
+
elements
.
measureTagsLen
+
1
);
ASSERT_EQ
(
elements
.
colsLen
,
strlen
(
"t1=3,t2=4,t3=t3"
));
...
...
@@ -79,7 +79,7 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp
=
"st, c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000"
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
);
ASSERT_EQ
(
elements
.
measureLen
,
strlen
(
"st"
));
...
...
@@ -98,7 +98,7 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp
=
" st c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000 "
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
+
1
);
ASSERT_EQ
(
elements
.
measureLen
,
strlen
(
"st"
));
...
...
@@ -116,21 +116,21 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp
=
" st c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 "
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
// case 7
tmp
=
" st , "
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
// case 8 false
tmp
=
", st , "
;
memcpy
(
sql
,
tmp
,
strlen
(
tmp
)
+
1
);
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseInfluxString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParseInfluxString
(
sql
,
sql
+
strlen
(
sql
),
&
elements
,
&
msgBuf
);
ASSERT_NE
(
ret
,
0
);
taosMemoryFree
(
sql
);
}
...
...
@@ -542,7 +542,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
"sys.procs.running 1479496100 42 host= web01"
,
};
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
int
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]);
int
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]
,
strlen
(
sql
[
i
])
);
ASSERT_NE
(
ret
,
0
);
}
...
...
@@ -561,7 +561,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
int
ret
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]);
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]
,
strlen
(
sql
[
i
])
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
break
;
}
ASSERT_NE
(
ret
,
0
);
...
...
@@ -617,7 +617,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
int
ret
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]);
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]
,
strlen
(
sql
[
i
])
);
ASSERT_NE
(
ret
,
0
);
}
...
...
@@ -653,7 +653,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
int
ret
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]);
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]
,
strlen
(
sql
[
i
])
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
break
;
}
ASSERT_NE
(
ret
,
0
);
...
...
@@ -688,7 +688,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
};
int
ret
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]);
ret
=
smlParseTelnetLine
(
info
,
(
void
*
)
sql
[
i
]
,
strlen
(
sql
[
i
])
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
break
;
}
ASSERT_NE
(
ret
,
0
);
...
...
@@ -1002,7 +1002,7 @@ TEST(testCase, sml_col_4096_Test) {
int
ret
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]);
i
++
)
{
ret
=
smlParseInfluxLine
(
info
,
sql
[
i
]);
ret
=
smlParseInfluxLine
(
info
,
sql
[
i
]
,
strlen
(
sql
[
i
])
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
break
;
}
ASSERT_NE
(
ret
,
0
);
...
...
utils/test/c/sml_test.c
浏览文件 @
25d6bbaf
...
...
@@ -63,6 +63,7 @@ int smlProcess_influx_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -86,6 +87,8 @@ int smlProcess_telnet_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -125,6 +128,8 @@ int smlProcess_json1_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -165,6 +170,8 @@ int smlProcess_json2_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -233,6 +240,8 @@ int smlProcess_json3_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -292,6 +301,8 @@ int smlProcess_json4_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -313,6 +324,8 @@ int sml_TD15662_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -333,6 +346,8 @@ int sml_TD15742_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -362,6 +377,8 @@ int sml_16384_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -781,6 +798,8 @@ int sml_oom_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -825,6 +844,8 @@ int sml_16368_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -862,6 +883,8 @@ int sml_dup_time_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -1068,6 +1091,8 @@ int sml_16960_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -1097,6 +1122,7 @@ int sml_add_tag_col_Test() {
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
...
...
@@ -1151,6 +1177,36 @@ int smlProcess_18784_Test() {
rowIndex
++
;
}
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
int
sml_19221_Test
()
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS_RES
*
pRes
=
taos_query
(
taos
,
"create database if not exists sml_db schemaless 1"
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
"qelhxo,id=pnnqhsa,t0=t,t1=127i8 c11=L
\"
ncharColValue
\"
,c0=t,c1=127i8 1626006833639000000
\n
qelhxo,id=pnnhsa,t0=t,t1=127i8 c11=L
\"
ncharColValue
\"
,c0=t,c1=127i8 1626006833639000000
\n
#comment
\n
qelhxo,id=pnqhsa,t0=t,t1=127i8 c11=L
\"
ncharColValue
\"
,c0=t,c1=127i8 1626006833639000000"
,
};
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
char
*
tmp
=
(
char
*
)
taosMemoryCalloc
(
1024
,
1
);
memcpy
(
tmp
,
sql
[
0
],
strlen
(
sql
[
0
]));
*
(
char
*
)(
tmp
+
44
)
=
0
;
int32_t
totalRows
=
0
;
pRes
=
taos_schemaless_insert_raw
(
taos
,
tmp
,
strlen
(
sql
[
0
]),
&
totalRows
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
ASSERT
(
totalRows
==
3
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
int
code
=
taos_errno
(
pRes
);
taos_free_result
(
pRes
);
taos_close
(
taos
);
taosMemoryFree
(
tmp
);
return
code
;
}
...
...
@@ -1187,5 +1243,7 @@ int main(int argc, char *argv[]) {
ASSERT
(
!
ret
);
ret
=
smlProcess_18784_Test
();
ASSERT
(
!
ret
);
ret
=
sml_19221_Test
();
ASSERT
(
!
ret
);
return
ret
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录