Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b761d0a3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b761d0a3
编写于
9月 28, 2022
作者:
W
wade zhang
提交者:
GitHub
9月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17044 from taosdata/feature/TD-19221
feat:add new interface for schemaless to support '\0' in line value
上级
5f9f3e7a
6ad5d7c0
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
226 addition
and
100 deletion
+226
-100
src/client/inc/tscParseLine.h
src/client/inc/tscParseLine.h
+2
-2
src/client/src/tscParseLineProtocol.c
src/client/src/tscParseLineProtocol.c
+130
-53
src/client/src/tscParseOpenTSDB.c
src/client/src/tscParseOpenTSDB.c
+93
-45
src/inc/taos.h
src/inc/taos.h
+1
-0
未找到文件。
src/client/inc/tscParseLine.h
浏览文件 @
b761d0a3
...
...
@@ -103,9 +103,9 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
void
destroySmlDataPoint
(
TAOS_SML_DATA_POINT
*
point
);
int
taos_insert_lines
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
int
taos_insert_lines
(
TAOS
*
taos
,
char
*
data
,
int32_t
len
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
);
int
taos_insert_telnet_lines
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
int
taos_insert_telnet_lines
(
TAOS
*
taos
,
char
*
data
,
int32_t
len
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
);
int
taos_insert_json_payload
(
TAOS
*
taos
,
char
*
payload
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
);
...
...
src/client/src/tscParseLineProtocol.c
浏览文件 @
b761d0a3
...
...
@@ -577,7 +577,9 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab
// Check if the table name available or not
if
(
tscValidateName
(
&
tableToken
,
true
,
&
dbIncluded
)
!=
TSDB_CODE_SUCCESS
)
{
code
=
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
if
(
pSql
->
cmd
.
payload
){
sprintf
(
pSql
->
cmd
.
payload
,
"table name is invalid"
);
}
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
return
code
;
}
...
...
@@ -1966,21 +1968,15 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseSmlTimeStamp
(
TAOS_SML_KV
**
pTS
,
const
char
**
idx
,
SSmlLinesInfo
*
info
)
{
const
char
*
start
,
*
cur
;
static
int32_t
parseSmlTimeStamp
(
TAOS_SML_KV
**
pTS
,
const
char
**
idx
,
int32_t
len
,
SSmlLinesInfo
*
info
)
{
const
char
*
start
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
int
len
=
0
;
char
key
[]
=
"ts"
;
char
*
value
=
NULL
;
start
=
cur
=
*
idx
;
start
=
*
idx
;
*
pTS
=
calloc
(
1
,
sizeof
(
TAOS_SML_KV
));
while
(
*
cur
!=
'\0'
)
{
cur
++
;
len
++
;
}
if
(
len
>
0
)
{
value
=
calloc
(
len
+
1
,
1
);
memcpy
(
value
,
start
,
len
);
...
...
@@ -2013,12 +2009,12 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
return
false
;
}
static
int32_t
parseSmlKey
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
static
int32_t
parseSmlKey
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
int32_t
sqlLen
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
char
key
[
TSDB_COL_NAME_LEN
+
1
];
// +1 to avoid key[len] over write
int16_t
len
=
0
;
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
len
>
TSDB_COL_NAME_LEN
-
1
)
{
tscError
(
"SML:0x%"
PRIx64
" Key field cannot exceeds %d characters"
,
info
->
id
,
TSDB_COL_NAME_LEN
-
1
);
return
TSDB_CODE_TSC_INVALID_COLUMN_LENGTH
;
...
...
@@ -2053,7 +2049,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash,
}
static
int32_t
parseSmlValue
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
static
int32_t
parseSmlValue
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
int32_t
sqlLen
,
bool
*
is_last_kv
,
SSmlLinesInfo
*
info
,
bool
isTag
)
{
const
char
*
start
,
*
cur
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2163,13 +2159,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
cur
++
;
break
;
}
else
if
(
double_quote
==
true
)
{
if
(
*
cur
!=
' '
&&
*
cur
!=
','
&&
*
cur
!=
'\0'
)
{
if
(
*
cur
!=
' '
&&
*
cur
!=
','
&&
(
cur
-
*
idx
<
sqlLen
)
)
{
tscError
(
"SML:0x%"
PRIx64
" tag value: state(%d), incorrect character(%c) behind closing
\"
"
,
info
->
id
,
tag_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
}
if
(
*
cur
==
' '
||
*
cur
==
'\0'
)
{
if
(
*
cur
==
' '
||
(
cur
-
*
idx
==
sqlLen
)
)
{
*
is_last_kv
=
true
;
}
...
...
@@ -2308,13 +2304,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
cur
++
;
break
;
}
else
if
(
double_quote
==
true
)
{
if
(
*
cur
!=
' '
&&
*
cur
!=
','
&&
*
cur
!=
'\0'
)
{
if
(
*
cur
!=
' '
&&
*
cur
!=
','
&&
(
cur
-
*
idx
<
sqlLen
)
)
{
tscError
(
"SML:0x%"
PRIx64
" field value: state(%d), incorrect character(%c) behind closing
\"
"
,
info
->
id
,
val_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
}
if
(
*
cur
==
' '
||
*
cur
==
'\0'
)
{
if
(
*
cur
==
' '
||
(
cur
-
*
idx
==
sqlLen
)
)
{
*
is_last_kv
=
true
;
}
...
...
@@ -2384,7 +2380,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
}
free
(
value
);
*
idx
=
(
*
cur
==
'\0'
)
?
cur
:
cur
+
1
;
*
idx
=
(
cur
-
*
idx
==
sqlLen
)
?
cur
:
cur
+
1
;
return
ret
;
error:
...
...
@@ -2395,7 +2391,7 @@ error:
return
ret
;
}
static
int32_t
parseSmlMeasurement
(
TAOS_SML_DATA_POINT
*
pSml
,
const
char
**
idx
,
static
int32_t
parseSmlMeasurement
(
TAOS_SML_DATA_POINT
*
pSml
,
const
char
**
idx
,
int32_t
sqlLen
,
uint8_t
*
has_tags
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
int16_t
len
=
0
;
...
...
@@ -2405,7 +2401,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx,
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
len
>
TSDB_TABLE_NAME_LEN
-
1
)
{
tscError
(
"SML:0x%"
PRIx64
" Measurement field cannot exceeds %d characters"
,
info
->
id
,
TSDB_TABLE_NAME_LEN
-
1
);
free
(
pSml
->
stableName
);
...
...
@@ -2464,7 +2460,7 @@ int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* i
static
int32_t
parseSmlKvPairs
(
TAOS_SML_KV
**
pKVs
,
int
*
num_kvs
,
const
char
**
idx
,
bool
isField
,
const
char
**
idx
,
int32_t
len
,
bool
isField
,
TAOS_SML_DATA_POINT
*
smlData
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
...
...
@@ -2495,13 +2491,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
addEscapeCharToString
(
childTableName
,
(
int32_t
)(
childTableNameLen
));
}
while
(
*
cur
!=
'\0'
)
{
ret
=
parseSmlKey
(
pkv
,
&
cur
,
pHash
,
info
);
while
(
cur
-
*
idx
<
len
)
{
ret
=
parseSmlKey
(
pkv
,
&
cur
,
len
-
(
cur
-
*
idx
),
pHash
,
info
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse key"
,
info
->
id
);
goto
error
;
}
ret
=
parseSmlValue
(
pkv
,
&
cur
,
&
is_last_kv
,
info
,
!
isField
);
ret
=
parseSmlValue
(
pkv
,
&
cur
,
len
-
(
cur
-
*
idx
),
&
is_last_kv
,
info
,
!
isField
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse value"
,
info
->
id
);
goto
error
;
...
...
@@ -2574,14 +2570,14 @@ static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *t
free
(
ts
);
}
int32_t
tscParseLine
(
const
char
*
sql
,
TAOS_SML_DATA_POINT
*
smlData
,
SSmlLinesInfo
*
info
)
{
int32_t
tscParseLine
(
const
char
*
sql
,
int32_t
len
,
TAOS_SML_DATA_POINT
*
smlData
,
SSmlLinesInfo
*
info
)
{
const
char
*
idx
=
sql
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
uint8_t
has_tags
=
0
;
TAOS_SML_KV
*
timestamp
=
NULL
;
SHashObj
*
keyHashTable
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
ret
=
parseSmlMeasurement
(
smlData
,
&
idx
,
&
has_tags
,
info
);
ret
=
parseSmlMeasurement
(
smlData
,
&
idx
,
len
,
&
has_tags
,
info
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse measurement"
,
info
->
id
);
taosHashCleanup
(
keyHashTable
);
...
...
@@ -2591,7 +2587,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
//Parse Tags
if
(
has_tags
)
{
ret
=
parseSmlKvPairs
(
&
smlData
->
tags
,
&
smlData
->
tagNum
,
&
idx
,
false
,
smlData
,
keyHashTable
,
info
);
ret
=
parseSmlKvPairs
(
&
smlData
->
tags
,
&
smlData
->
tagNum
,
&
idx
,
len
-
(
idx
-
sql
),
false
,
smlData
,
keyHashTable
,
info
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse tag"
,
info
->
id
);
taosHashCleanup
(
keyHashTable
);
...
...
@@ -2601,7 +2597,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
tscDebug
(
"SML:0x%"
PRIx64
" Parse tags finished, num of tags:%d"
,
info
->
id
,
smlData
->
tagNum
);
//Parse fields
ret
=
parseSmlKvPairs
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
true
,
smlData
,
keyHashTable
,
info
);
ret
=
parseSmlKvPairs
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
len
-
(
idx
-
sql
),
true
,
smlData
,
keyHashTable
,
info
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse field"
,
info
->
id
);
taosHashCleanup
(
keyHashTable
);
...
...
@@ -2617,7 +2613,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
taosHashCleanup
(
keyHashTable
);
//Parse timestamp
ret
=
parseSmlTimeStamp
(
&
timestamp
,
&
idx
,
info
);
ret
=
parseSmlTimeStamp
(
&
timestamp
,
&
idx
,
len
-
(
idx
-
sql
),
info
);
if
(
ret
)
{
tscError
(
"SML:0x%"
PRIx64
" Unable to parse timestamp"
,
info
->
id
);
return
ret
;
...
...
@@ -2652,24 +2648,58 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
free
(
point
->
childTableName
);
}
int32_t
tscParseLines
(
char
*
lines
[],
int
numLines
,
SArray
*
points
,
SArray
*
failedLines
,
SSmlLinesInfo
*
info
)
{
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
static
int32_t
tscParseLinesInner
(
char
*
line
,
int32_t
len
,
SArray
*
points
,
SSmlLinesInfo
*
info
){
TAOS_SML_DATA_POINT
point
=
{
0
};
int32_t
code
=
tscParseLine
(
lines
[
i
]
,
&
point
,
info
);
int32_t
code
=
tscParseLine
(
line
,
len
,
&
point
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"SML:0x%"
PRIx64
" data point line parse failed. line %d : %s"
,
info
->
id
,
i
,
lines
[
i
]
);
tscError
(
"SML:0x%"
PRIx64
" data point line parse failed."
,
info
->
id
);
destroySmlDataPoint
(
&
point
);
return
code
;
}
else
{
tscDebug
(
"SML:0x%"
PRIx64
" data point line parse success. line %d"
,
info
->
id
,
i
);
tscDebug
(
"SML:0x%"
PRIx64
" data point line parse success."
,
info
->
id
);
}
taosArrayPush
(
points
,
&
point
);
}
return
TSDB_CODE_SUCCESS
;
}
int
taos_insert_lines
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
)
{
int32_t
tscParseLines
(
char
*
data
,
int32_t
len
,
char
*
lines
[],
int
numLines
,
SArray
*
points
,
SArray
*
failedLines
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
!
data
&&
lines
){
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
code
=
tscParseLinesInner
(
lines
[
i
],
strlen
(
lines
[
i
]),
points
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
return
code
;
}
}
}
else
if
(
data
&&
!
lines
){
char
*
tmp
=
data
;
int32_t
lenTmp
=
0
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
data
[
i
]
==
'\n'
||
i
==
len
-
1
){
if
(
data
[
i
]
!=
'\n'
||
i
==
len
-
1
){
lenTmp
++
;
}
if
(
lenTmp
>
0
)
{
code
=
tscParseLinesInner
(
tmp
,
lenTmp
,
points
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
return
code
;
}
}
if
(
i
<
len
-
1
)
{
tmp
=
data
+
i
+
1
;
}
lenTmp
=
0
;
}
else
{
lenTmp
++
;
}
}
}
return
code
;
}
int
taos_insert_lines
(
TAOS
*
taos
,
char
*
data
,
int
len
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
)
{
int32_t
code
=
0
;
SSmlLinesInfo
*
info
=
tcalloc
(
1
,
sizeof
(
SSmlLinesInfo
));
...
...
@@ -2677,6 +2707,18 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
info
->
tsType
=
tsType
;
info
->
protocol
=
protocol
;
if
(
data
){
numLines
=
0
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
data
[
i
]
==
'\0'
){
data
[
i
]
=
'0'
;
}
if
(
data
[
i
]
==
'\n'
||
i
==
len
-
1
){
numLines
++
;
}
}
}
if
(
numLines
<=
0
||
numLines
>
65536
*
32
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d"
,
info
->
id
,
numLines
);
tfree
(
info
);
...
...
@@ -2684,6 +2726,7 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
return
code
;
}
if
(
lines
){
for
(
int
i
=
0
;
i
<
numLines
;
++
i
)
{
if
(
lines
[
i
]
==
NULL
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_insert_lines line %d is NULL"
,
info
->
id
,
i
);
...
...
@@ -2692,6 +2735,7 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
return
code
;
}
}
}
SArray
*
lpPoints
=
taosArrayInit
(
numLines
,
sizeof
(
TAOS_SML_DATA_POINT
));
if
(
lpPoints
==
NULL
)
{
...
...
@@ -2700,8 +2744,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
tscDebug
(
"SML:0x%"
PRIx64
" taos_insert_lines begin inserting %d lines
, first line: %s"
,
info
->
id
,
numLines
,
lines
[
0
]
);
code
=
tscParseLines
(
lines
,
numLines
,
lpPoints
,
NULL
,
info
);
tscDebug
(
"SML:0x%"
PRIx64
" taos_insert_lines begin inserting %d lines
"
,
info
->
id
,
numLines
);
code
=
tscParseLines
(
data
,
len
,
lines
,
numLines
,
lpPoints
,
NULL
,
info
);
size_t
numPoints
=
taosArrayGetSize
(
lpPoints
);
if
(
code
!=
0
)
{
...
...
@@ -2816,10 +2860,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
switch
(
protocol
)
{
case
TSDB_SML_LINE_PROTOCOL
:
code
=
taos_insert_lines
(
taos
,
lines
,
numLines
,
protocol
,
tsType
,
&
affected_rows
);
code
=
taos_insert_lines
(
taos
,
NULL
,
0
,
lines
,
numLines
,
protocol
,
tsType
,
&
affected_rows
);
break
;
case
TSDB_SML_TELNET_PROTOCOL
:
code
=
taos_insert_telnet_lines
(
taos
,
lines
,
numLines
,
protocol
,
tsType
,
&
affected_rows
);
code
=
taos_insert_telnet_lines
(
taos
,
NULL
,
0
,
lines
,
numLines
,
protocol
,
tsType
,
&
affected_rows
);
break
;
case
TSDB_SML_JSON_PROTOCOL
:
code
=
taos_insert_json_payload
(
taos
,
*
lines
,
protocol
,
tsType
,
&
affected_rows
);
...
...
@@ -2830,6 +2874,39 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
SSqlObj
*
pSql
=
createSmlQueryObj
(
taos
,
affected_rows
,
code
);
return
(
TAOS_RES
*
)
pSql
;
}
TAOS_RES
*
taos_schemaless_insert_new
(
TAOS
*
taos
,
char
*
lines
,
int
len
,
int32_t
*
totalRows
,
int
protocol
,
int
precision
){
int
code
=
TSDB_CODE_SUCCESS
;
int
affected_rows
=
0
;
SMLTimeStampType
tsType
=
SML_TIME_STAMP_NOW
;
if
(
protocol
==
TSDB_SML_LINE_PROTOCOL
)
{
code
=
convertPrecisionType
(
precision
,
&
tsType
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
}
switch
(
protocol
)
{
case
TSDB_SML_LINE_PROTOCOL
:
code
=
taos_insert_lines
(
taos
,
lines
,
len
,
NULL
,
0
,
protocol
,
tsType
,
&
affected_rows
);
break
;
case
TSDB_SML_TELNET_PROTOCOL
:
code
=
taos_insert_telnet_lines
(
taos
,
lines
,
len
,
NULL
,
0
,
protocol
,
tsType
,
&
affected_rows
);
break
;
case
TSDB_SML_JSON_PROTOCOL
:
code
=
taos_insert_json_payload
(
taos
,
lines
,
protocol
,
tsType
,
&
affected_rows
);
break
;
default:
code
=
TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE
;
break
;
}
SSqlObj
*
pSql
=
createSmlQueryObj
(
taos
,
affected_rows
,
code
);
return
(
TAOS_RES
*
)
pSql
;
...
...
src/client/src/tscParseOpenTSDB.c
浏览文件 @
b761d0a3
...
...
@@ -33,7 +33,7 @@ static uint64_t genUID() {
return
id
;
}
static
int32_t
parseTelnetMetric
(
TAOS_SML_DATA_POINT
*
pSml
,
const
char
**
idx
,
SSmlLinesInfo
*
info
)
{
static
int32_t
parseTelnetMetric
(
TAOS_SML_DATA_POINT
*
pSml
,
const
char
**
idx
,
int32_t
sqlLen
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
uint16_t
len
=
0
;
...
...
@@ -49,7 +49,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS
}
*/
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
len
>
TSDB_TABLE_NAME_LEN
-
1
)
{
tscError
(
"OTD:0x%"
PRIx64
" Metric cannot exceeds %d characters"
,
info
->
id
,
TSDB_TABLE_NAME_LEN
-
1
);
tfree
(
pSml
->
stableName
);
...
...
@@ -82,7 +82,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseTelnetTimeStamp
(
TAOS_SML_KV
**
pTS
,
int
*
num_kvs
,
const
char
**
idx
,
SSmlLinesInfo
*
info
)
{
static
int32_t
parseTelnetTimeStamp
(
TAOS_SML_KV
**
pTS
,
int
*
num_kvs
,
const
char
**
idx
,
int32_t
sqlLen
,
SSmlLinesInfo
*
info
)
{
//Timestamp must be the first KV to parse
assert
(
*
num_kvs
==
0
);
...
...
@@ -96,7 +96,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
//allocate fields for timestamp and value
*
pTS
=
tcalloc
(
OTD_MAX_FIELDS_NUM
,
sizeof
(
TAOS_SML_KV
));
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
*
cur
==
' '
)
{
if
(
*
(
cur
+
1
)
!=
' '
)
{
break
;
...
...
@@ -109,7 +109,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
len
++
;
}
if
(
len
>
0
&&
*
cur
!=
'\0'
)
{
if
(
len
>
0
&&
cur
-
*
idx
<
sqlLen
)
{
value
=
tcalloc
(
len
+
1
,
1
);
memcpy
(
value
,
start
,
len
);
}
else
{
...
...
@@ -135,7 +135,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
return
ret
;
}
static
int32_t
parseTelnetMetricValue
(
TAOS_SML_KV
**
pKVs
,
int
*
num_kvs
,
const
char
**
idx
,
SSmlLinesInfo
*
info
)
{
static
int32_t
parseTelnetMetricValue
(
TAOS_SML_KV
**
pKVs
,
int
*
num_kvs
,
const
char
**
idx
,
int32_t
sqlLen
,
SSmlLinesInfo
*
info
)
{
//skip timestamp
TAOS_SML_KV
*
pVal
=
*
pKVs
+
1
;
const
char
*
start
,
*
cur
;
...
...
@@ -158,7 +158,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
len
+=
2
;
}
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
*
cur
==
' '
)
{
if
(
searchQuote
==
true
)
{
if
(
*
(
cur
-
1
)
==
'"'
&&
len
!=
1
&&
len
!=
2
)
{
...
...
@@ -181,7 +181,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
len
++
;
}
if
(
len
>
0
&&
*
cur
!=
'\0'
)
{
if
(
len
>
0
&&
cur
-
*
idx
<
sqlLen
)
{
value
=
tcalloc
(
len
+
1
,
1
);
memcpy
(
value
,
start
,
len
);
}
else
{
...
...
@@ -205,7 +205,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
return
ret
;
}
static
int32_t
parseTelnetTagKey
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
static
int32_t
parseTelnetTagKey
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
int32_t
sqlLen
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
char
key
[
TSDB_COL_NAME_LEN
];
uint16_t
len
=
0
;
...
...
@@ -215,7 +215,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
// tscError("OTD:0x%"PRIx64" Tag key cannot start with digit", info->id);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
//}
while
(
*
cur
!=
'\0'
)
{
while
(
cur
-
*
idx
<
sqlLen
)
{
if
(
len
>
TSDB_COL_NAME_LEN
-
1
)
{
tscError
(
"OTD:0x%"
PRIx64
" Tag key cannot exceeds %d characters"
,
info
->
id
,
TSDB_COL_NAME_LEN
-
1
);
return
TSDB_CODE_TSC_INVALID_COLUMN_LENGTH
;
...
...
@@ -231,7 +231,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
cur
++
;
len
++
;
}
if
(
len
==
0
||
*
cur
==
'\0'
)
{
if
(
len
==
0
||
cur
-
*
idx
==
sqlLen
)
{
return
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
}
key
[
len
]
=
'\0'
;
...
...
@@ -249,7 +249,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
}
static
int32_t
parseTelnetTagValue
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
static
int32_t
parseTelnetTagValue
(
TAOS_SML_KV
*
pKV
,
const
char
**
idx
,
int32_t
sqlLen
,
bool
*
is_last_kv
,
SSmlLinesInfo
*
info
)
{
const
char
*
start
,
*
cur
;
char
*
value
=
NULL
;
...
...
@@ -258,9 +258,9 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx,
while
(
1
)
{
// whitespace or '\0' identifies a value
if
(
*
cur
==
' '
||
*
cur
==
'\0'
)
{
if
(
*
cur
==
' '
||
cur
-
*
idx
==
sqlLen
)
{
// '\0' indicates end of value
*
is_last_kv
=
(
*
cur
==
'\0'
)
?
true
:
false
;
*
is_last_kv
=
(
cur
-
*
idx
==
sqlLen
)
?
true
:
false
;
if
(
*
cur
==
' '
&&
*
(
cur
+
1
)
==
' '
)
{
cur
++
;
continue
;
...
...
@@ -290,12 +290,12 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx,
}
tfree
(
value
);
*
idx
=
(
*
cur
==
'\0'
)
?
cur
:
cur
+
1
;
*
idx
=
(
cur
-
*
idx
==
sqlLen
)
?
cur
:
cur
+
1
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseTelnetTagKvs
(
TAOS_SML_KV
**
pKVs
,
int
*
num_kvs
,
const
char
**
idx
,
char
**
childTableName
,
const
char
**
idx
,
int32_t
sqlLen
,
char
**
childTableName
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
)
{
const
char
*
cur
=
*
idx
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
...
...
@@ -312,13 +312,13 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
memcpy
(
childTbName
,
tsSmlChildTableName
,
childTableNameLen
);
addEscapeCharToString
(
childTbName
,
(
int32_t
)(
childTableNameLen
));
}
while
(
*
cur
!=
'\0'
)
{
ret
=
parseTelnetTagKey
(
pkv
,
&
cur
,
pHash
,
info
);
while
(
cur
-
*
idx
<
sqlLen
)
{
ret
=
parseTelnetTagKey
(
pkv
,
&
cur
,
sqlLen
-
(
cur
-
*
idx
),
pHash
,
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse key"
,
info
->
id
);
return
ret
;
}
ret
=
parseTelnetTagValue
(
pkv
,
&
cur
,
&
is_last_kv
,
info
);
ret
=
parseTelnetTagValue
(
pkv
,
&
cur
,
sqlLen
-
(
cur
-
*
idx
),
&
is_last_kv
,
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse value"
,
info
->
id
);
return
ret
;
...
...
@@ -356,12 +356,12 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
return
ret
;
}
static
int32_t
tscParseTelnetLine
(
const
char
*
line
,
TAOS_SML_DATA_POINT
*
smlData
,
SSmlLinesInfo
*
info
)
{
static
int32_t
tscParseTelnetLine
(
const
char
*
line
,
int32_t
len
,
TAOS_SML_DATA_POINT
*
smlData
,
SSmlLinesInfo
*
info
)
{
const
char
*
idx
=
line
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
//Parse metric
ret
=
parseTelnetMetric
(
smlData
,
&
idx
,
info
);
ret
=
parseTelnetMetric
(
smlData
,
&
idx
,
len
,
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse metric"
,
info
->
id
);
return
ret
;
...
...
@@ -369,7 +369,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
tscDebug
(
"OTD:0x%"
PRIx64
" Parse metric finished"
,
info
->
id
);
//Parse timestamp
ret
=
parseTelnetTimeStamp
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
info
);
ret
=
parseTelnetTimeStamp
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
len
-
(
idx
-
line
),
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse timestamp"
,
info
->
id
);
return
ret
;
...
...
@@ -377,7 +377,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
tscDebug
(
"OTD:0x%"
PRIx64
" Parse timestamp finished"
,
info
->
id
);
//Parse value
ret
=
parseTelnetMetricValue
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
info
);
ret
=
parseTelnetMetricValue
(
&
smlData
->
fields
,
&
smlData
->
fieldNum
,
&
idx
,
len
-
(
idx
-
line
),
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse metric value"
,
info
->
id
);
return
ret
;
...
...
@@ -386,7 +386,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
//Parse tagKVs
SHashObj
*
keyHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
ret
=
parseTelnetTagKvs
(
&
smlData
->
tags
,
&
smlData
->
tagNum
,
&
idx
,
&
smlData
->
childTableName
,
keyHashTable
,
info
);
ret
=
parseTelnetTagKvs
(
&
smlData
->
tags
,
&
smlData
->
tagNum
,
&
idx
,
len
-
(
idx
-
line
),
&
smlData
->
childTableName
,
keyHashTable
,
info
);
if
(
ret
)
{
tscError
(
"OTD:0x%"
PRIx64
" Unable to parse tags"
,
info
->
id
);
taosHashCleanup
(
keyHashTable
);
...
...
@@ -399,24 +399,58 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tscParseTelnetLines
(
char
*
lines
[],
int
numLines
,
SArray
*
points
,
SArray
*
failedLines
,
SSmlLinesInfo
*
info
)
{
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
static
int32_t
tscParseTelnetLinesInner
(
char
*
data
,
int32_t
len
,
SArray
*
points
,
SSmlLinesInfo
*
info
)
{
TAOS_SML_DATA_POINT
point
=
{
0
};
int32_t
code
=
tscParseTelnetLine
(
lines
[
i
]
,
&
point
,
info
);
int32_t
code
=
tscParseTelnetLine
(
data
,
len
,
&
point
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"OTD:0x%"
PRIx64
" data point line parse failed. line %d : %s"
,
info
->
id
,
i
,
lines
[
i
]
);
tscError
(
"OTD:0x%"
PRIx64
" data point line parse failed."
,
info
->
id
);
destroySmlDataPoint
(
&
point
);
return
code
;
}
else
{
tscDebug
(
"OTD:0x%"
PRIx64
" data point line parse success. line %d"
,
info
->
id
,
i
);
tscDebug
(
"OTD:0x%"
PRIx64
" data point line parse success."
,
info
->
id
);
}
taosArrayPush
(
points
,
&
point
);
}
return
TSDB_CODE_SUCCESS
;
}
int
taos_insert_telnet_lines
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
)
{
static
int32_t
tscParseTelnetLines
(
char
*
data
,
int32_t
len
,
char
*
lines
[],
int
numLines
,
SArray
*
points
,
SArray
*
failedLines
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
!
data
&&
lines
){
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
code
=
tscParseTelnetLinesInner
(
lines
[
i
],
strlen
(
lines
[
i
]),
points
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
return
code
;
}
}
}
else
if
(
data
&&
!
lines
){
char
*
tmp
=
data
;
int32_t
lenTmp
=
0
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
data
[
i
]
==
'\n'
||
i
==
len
-
1
){
if
(
data
[
i
]
!=
'\n'
||
i
==
len
-
1
){
lenTmp
++
;
}
if
(
lenTmp
>
0
)
{
code
=
tscParseTelnetLinesInner
(
tmp
,
lenTmp
,
points
,
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
return
code
;
}
}
if
(
i
<
len
-
1
)
{
tmp
=
data
+
i
+
1
;
}
lenTmp
=
0
;
}
else
{
lenTmp
++
;
}
}
}
return
code
;
}
int
taos_insert_telnet_lines
(
TAOS
*
taos
,
char
*
data
,
int32_t
len
,
char
*
lines
[],
int
numLines
,
SMLProtocolType
protocol
,
SMLTimeStampType
tsType
,
int
*
affectedRows
)
{
int32_t
code
=
0
;
SSmlLinesInfo
*
info
=
tcalloc
(
1
,
sizeof
(
SSmlLinesInfo
));
...
...
@@ -424,6 +458,18 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
info
->
tsType
=
tsType
;
info
->
protocol
=
protocol
;
if
(
data
&&
!
lines
){
numLines
=
0
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
data
[
i
]
==
'\0'
){
data
[
i
]
=
'0'
;
}
if
(
data
[
i
]
==
'\n'
||
i
==
len
-
1
){
numLines
++
;
}
}
}
if
(
numLines
<=
0
||
numLines
>
65536
)
{
tscError
(
"OTD:0x%"
PRIx64
" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d"
,
info
->
id
,
numLines
);
tfree
(
info
);
...
...
@@ -431,6 +477,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
return
code
;
}
if
(
!
data
&&
lines
){
for
(
int
i
=
0
;
i
<
numLines
;
++
i
)
{
if
(
lines
[
i
]
==
NULL
)
{
tscError
(
"OTD:0x%"
PRIx64
" taos_insert_telnet_lines line %d is NULL"
,
info
->
id
,
i
);
...
...
@@ -439,6 +486,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
return
code
;
}
}
}
SArray
*
lpPoints
=
taosArrayInit
(
numLines
,
sizeof
(
TAOS_SML_DATA_POINT
));
if
(
lpPoints
==
NULL
)
{
...
...
@@ -448,7 +496,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
}
tscDebug
(
"OTD:0x%"
PRIx64
" taos_insert_telnet_lines begin inserting %d lines, first line: %s"
,
info
->
id
,
numLines
,
lines
[
0
]);
code
=
tscParseTelnetLines
(
lines
,
numLines
,
lpPoints
,
NULL
,
info
);
code
=
tscParseTelnetLines
(
data
,
len
,
lines
,
numLines
,
lpPoints
,
NULL
,
info
);
size_t
numPoints
=
taosArrayGetSize
(
lpPoints
);
if
(
code
!=
0
)
{
...
...
src/inc/taos.h
浏览文件 @
b761d0a3
...
...
@@ -215,6 +215,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert_new
(
TAOS
*
taos
,
char
*
lines
,
int
len
,
int32_t
*
totalRows
,
int
protocol
,
int
precision
);
DLL_EXPORT
int32_t
taos_parse_time
(
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
,
int8_t
dayligth
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录