Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5be1b4e6
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5be1b4e6
编写于
5月 31, 2022
作者:
M
Minglei Jin
提交者:
GitHub
5月 31, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13300 from taosdata/fix/TS-1545-V24
fix(client): V24 line protocol sync from develop
上级
4cbe6457
0c23efb0
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
278 addition
and
365 deletion
+278
-365
src/client/inc/tscParseLine.h
src/client/inc/tscParseLine.h
+22
-0
src/client/src/tscParseLineProtocol.c
src/client/src/tscParseLineProtocol.c
+256
-365
未找到文件。
src/client/inc/tscParseLine.h
浏览文件 @
5be1b4e6
...
...
@@ -58,6 +58,22 @@ typedef enum {
SML_TIME_STAMP_NOW
}
SMLTimeStampType
;
typedef
struct
SSmlSqlInsertBatch
{
uint64_t
id
;
int32_t
index
;
char
*
sql
;
int32_t
code
;
int32_t
tryTimes
;
tsem_t
sem
;
int32_t
affectedRows
;
bool
tryAgain
;
bool
resetQueryCache
;
bool
sleep
;
}
SSmlSqlInsertBatch
;
#define MAX_SML_SQL_INSERT_BATCHES 512
typedef
struct
{
uint64_t
id
;
SMLProtocolType
protocol
;
...
...
@@ -65,7 +81,13 @@ typedef struct {
SHashObj
*
smlDataToSchema
;
int32_t
affectedRows
;
pthread_mutex_t
batchMutex
;
pthread_cond_t
batchCond
;
int32_t
numBatches
;
SSmlSqlInsertBatch
batches
[
MAX_SML_SQL_INSERT_BATCHES
];
}
SSmlLinesInfo
;
char
*
addEscapeCharToString
(
char
*
str
,
int32_t
len
);
int
tscSmlInsert
(
TAOS
*
taos
,
TAOS_SML_DATA_POINT
*
points
,
int
numPoint
,
SSmlLinesInfo
*
info
);
bool
checkDuplicateKey
(
char
*
key
,
SHashObj
*
pHash
,
SSmlLinesInfo
*
info
);
...
...
src/client/src/tscParseLineProtocol.c
浏览文件 @
5be1b4e6
...
...
@@ -32,10 +32,6 @@ typedef struct {
static
uint64_t
linesSmlHandleId
=
0
;
static
int32_t
insertChildTablePointsBatch
(
void
*
pVoid
,
char
*
name
,
char
*
name1
,
SArray
*
pArray
,
SArray
*
pArray1
,
SArray
*
pArray2
,
SArray
*
pArray3
,
size_t
size
,
SSmlLinesInfo
*
info
);
static
int32_t
doInsertChildTablePoints
(
void
*
pVoid
,
char
*
sql
,
char
*
name
,
SArray
*
pArray
,
SArray
*
pArray1
,
SSmlLinesInfo
*
info
);
uint64_t
genLinesSmlId
()
{
uint64_t
id
;
...
...
@@ -91,16 +87,17 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t
*
bytes
=
tDataTypes
[
kv
->
type
].
bytes
;
}
else
{
if
(
kv
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
char
*
ucs
=
malloc
(
kv
->
length
*
TSDB_NCHAR_SIZE
+
1
);
int32_t
bytesNeeded
=
0
;
bool
succ
=
taosMbsToUcs4
(
kv
->
value
,
kv
->
length
,
ucs
,
kv
->
length
*
TSDB_NCHAR_SIZE
,
&
bytesNeeded
);
if
(
!
succ
)
{
free
(
ucs
);
tscError
(
"SML:0x%"
PRIx64
" convert nchar string to UCS4_LE failed:%s"
,
id
,
kv
->
value
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
free
(
ucs
);
*
bytes
=
bytesNeeded
+
VARSTR_HEADER_SIZE
;
// char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
// int32_t bytesNeeded = 0;
// bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
// if (!succ) {
// free(ucs);
// tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
// return TSDB_CODE_TSC_INVALID_VALUE;
// }
// free(ucs);
// *bytes = bytesNeeded + VARSTR_HEADER_SIZE;
*
bytes
=
kv
->
length
*
TSDB_NCHAR_SIZE
+
VARSTR_HEADER_SIZE
;
}
else
if
(
kv
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
*
bytes
=
kv
->
length
+
VARSTR_HEADER_SIZE
;
}
...
...
@@ -792,9 +789,23 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return
0
;
}
static
int32_t
applyChildTableDataPointsWithInsertSQL
(
TAOS
*
taos
,
char
*
cTableName
,
char
*
sTableName
,
SSmlSTableSchema
*
sTableSchema
,
SArray
*
cTablePoints
,
size_t
rowSize
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
static
int
smlSnprintf
(
char
*
buf
,
int32_t
*
total
,
int32_t
cap
,
char
*
fmt
,
...)
{
if
(
*
total
>
cap
)
{
return
-
1
;
}
va_list
argp
;
va_start
(
argp
,
fmt
);
int
len
=
vsnprintf
(
buf
+
*
total
,
cap
-
*
total
,
fmt
,
argp
);
if
(
len
<
0
||
len
>=
cap
-
*
total
)
{
return
-
2
;
}
*
total
+=
len
;
return
0
;
}
static
int32_t
addChildTableDataPointsToInsertSql
(
char
*
cTableName
,
char
*
sTableName
,
SSmlSTableSchema
*
sTableSchema
,
SArray
*
cTablePoints
,
char
*
sql
,
int32_t
capacity
,
int32_t
*
cTableSqlLen
,
int
fromIndex
,
int
*
nextIndex
,
SSmlLinesInfo
*
info
)
{
size_t
numTags
=
taosArrayGetSize
(
sTableSchema
->
tags
);
size_t
numCols
=
taosArrayGetSize
(
sTableSchema
->
fields
);
size_t
rows
=
taosArrayGetSize
(
cTablePoints
);
...
...
@@ -810,53 +821,79 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
}
}
char
*
sql
=
malloc
(
tsMaxSQLStringLen
+
1
);
if
(
sql
==
NULL
)
{
tscError
(
"malloc sql memory error"
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
TAOS_SML_KV
**
colKVs
=
malloc
(
numCols
*
sizeof
(
TAOS_SML_KV
*
));
int
r
=
fromIndex
;
int32_t
freeBytes
=
tsMaxSQLStringLen
+
1
;
int32_t
totalLen
=
0
;
totalLen
+=
sprintf
(
sql
,
"insert into %s using %s ("
,
cTableName
,
sTableName
);
int
ret
=
0
;
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
" %s using %s ("
,
cTableName
,
sTableName
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
for
(
int
i
=
0
;
i
<
numTags
;
++
i
)
{
SSchema
*
tagSchema
=
taosArrayGet
(
tagsSchema
,
i
);
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
"%s,"
,
tagSchema
->
name
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
"%s,"
,
tagSchema
->
name
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
--
totalLen
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
")"
);
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
" tags ("
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
") tags ("
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for
(
int
i
=
0
;
i
<
numTags
;
++
i
)
{
if
(
capacity
-
totalLen
<
TSDB_MAX_BYTES_PER_ROW
)
{
goto
_cleanup
;
}
if
(
tagKVs
[
i
]
==
NULL
)
{
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
"NULL,"
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
"NULL,"
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
else
{
TAOS_SML_KV
*
kv
=
tagKVs
[
i
];
size_t
beforeLen
=
totalLen
;
int32_t
len
=
0
;
converToStr
(
sql
+
beforeLen
,
kv
->
type
,
kv
->
value
,
kv
->
length
,
&
len
);
totalLen
+=
len
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
","
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
","
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
}
--
totalLen
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
") ("
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
") ("
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
for
(
int
i
=
0
;
i
<
numCols
;
++
i
)
{
SSchema
*
colSchema
=
taosArrayGet
(
colsSchema
,
i
);
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
"%s,"
,
colSchema
->
name
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
"%s,"
,
colSchema
->
name
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
--
totalLen
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
") values "
);
TAOS_SML_KV
**
colKVs
=
malloc
(
numCols
*
sizeof
(
TAOS_SML_KV
*
));
for
(
int
r
=
0
;
r
<
rows
;
++
r
)
{
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
"("
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
") values "
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
for
(;
r
<
rows
;
++
r
)
{
if
(
capacity
-
totalLen
<
TSDB_MAX_BYTES_PER_ROW
)
{
break
;
}
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
"("
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
memset
(
colKVs
,
0
,
numCols
*
sizeof
(
TAOS_SML_KV
*
));
TAOS_SML_DATA_POINT
*
point
=
taosArrayGetP
(
cTablePoints
,
r
);
...
...
@@ -867,372 +904,215 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
for
(
int
i
=
0
;
i
<
numCols
;
++
i
)
{
if
(
colKVs
[
i
]
==
NULL
)
{
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
"NULL,"
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
"NULL,"
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
else
{
TAOS_SML_KV
*
kv
=
colKVs
[
i
];
size_t
beforeLen
=
totalLen
;
int32_t
len
=
0
;
converToStr
(
sql
+
beforeLen
,
kv
->
type
,
kv
->
value
,
kv
->
length
,
&
len
);
totalLen
+=
len
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
","
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
","
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
}
--
totalLen
;
totalLen
+=
snprintf
(
sql
+
totalLen
,
freeBytes
-
totalLen
,
")"
);
ret
=
smlSnprintf
(
sql
,
&
totalLen
,
capacity
,
")"
);
if
(
ret
!=
0
)
{
goto
_cleanup
;
}
}
_cleanup:
free
(
colKVs
);
sql
[
totalLen
]
=
'\0'
;
tscDebug
(
"SML:0x%"
PRIx64
" insert child table table %s of super table %s sql: %s"
,
info
->
id
,
cTableName
,
sTableName
,
sql
);
bool
tryAgain
=
false
;
int32_t
try
=
0
;
do
{
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_query return %d:%s"
,
info
->
id
,
code
,
taos_errstr
(
res
));
}
tscDebug
(
"SML:0x%"
PRIx64
" taos_query inserted %d rows"
,
info
->
id
,
taos_affected_rows
(
res
));
info
->
affectedRows
+=
taos_affected_rows
(
res
);
taos_free_result
(
res
);
tryAgain
=
false
;
if
((
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
||
code
==
TSDB_CODE_TDB_TABLE_RECONFIGURE
||
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
&&
try
++
<
TSDB_MAX_REPLICA
)
{
tryAgain
=
true
;
}
if
(
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
)
{
TAOS_RES
*
res2
=
taos_query
(
taos
,
"RESET QUERY CACHE"
);
int32_t
code2
=
taos_errno
(
res2
);
if
(
code2
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"SML:0x%"
PRIx64
" insert child table by sql. reset query cache. error: %s"
,
info
->
id
,
taos_errstr
(
res2
));
}
taos_free_result
(
res2
);
if
(
tryAgain
)
{
taosMsleep
(
100
*
(
2
<<
try
));
}
}
if
(
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
if
(
tryAgain
)
{
taosMsleep
(
100
*
(
2
<<
try
));
}
}
}
while
(
tryAgain
);
free
(
sql
);
if
(
r
==
fromIndex
)
{
tscError
(
"buffer can not fit one line"
);
*
cTableSqlLen
=
0
;
}
else
{
*
cTableSqlLen
=
totalLen
;
}
*
nextIndex
=
r
;
return
code
;
return
0
;
}
static
int32_t
applyChildTableDataPointsWithStmt
(
TAOS
*
taos
,
char
*
cTableName
,
char
*
sTableName
,
SSmlSTableSchema
*
sTableSchema
,
SArray
*
cTablePoints
,
size_t
rowSize
,
SSmlLinesInfo
*
info
)
{
size_t
numTags
=
taosArrayGetSize
(
sTableSchema
->
tags
);
size_t
numCols
=
taosArrayGetSize
(
sTableSchema
->
fields
);
size_t
rows
=
taosArrayGetSize
(
cTablePoints
);
static
void
insertCallback
(
void
*
param
,
TAOS_RES
*
res
,
int32_t
notUsedCode
)
{
SSmlSqlInsertBatch
*
batch
=
(
SSmlSqlInsertBatch
*
)
param
;
batch
->
code
=
taos_errno
(
res
);
TAOS_SML_KV
*
tagKVs
[
TSDB_MAX_TAGS
]
=
{
0
};
for
(
int
i
=
0
;
i
<
rows
;
++
i
)
{
TAOS_SML_DATA_POINT
*
pDataPoint
=
taosArrayGetP
(
cTablePoints
,
i
);
for
(
int
j
=
0
;
j
<
pDataPoint
->
tagNum
;
++
j
)
{
TAOS_SML_KV
*
kv
=
pDataPoint
->
tags
+
j
;
tagKVs
[
kv
->
fieldSchemaIdx
]
=
kv
;
}
if
(
batch
->
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" batch %d , taos_query_a return %d:%s"
,
batch
->
id
,
batch
->
index
,
batch
->
code
,
taos_errstr
(
res
));
}
tscDebug
(
"SML:0x%"
PRIx64
" batch %d, taos_query inserted %d rows"
,
batch
->
id
,
batch
->
index
,
taos_affected_rows
(
res
));
batch
->
affectedRows
=
taos_affected_rows
(
res
);
taos_free_result
(
res
);
//tag bind
SArray
*
tagBinds
=
taosArrayInit
(
numTags
,
sizeof
(
TAOS_BIND
));
taosArraySetSize
(
tagBinds
,
numTags
);
int
isNullColBind
=
TSDB_TRUE
;
for
(
int
j
=
0
;
j
<
numTags
;
++
j
)
{
TAOS_BIND
*
bind
=
taosArrayGet
(
tagBinds
,
j
);
bind
->
is_null
=
&
isNullColBind
;
}
for
(
int
j
=
0
;
j
<
numTags
;
++
j
)
{
if
(
tagKVs
[
j
]
==
NULL
)
continue
;
TAOS_SML_KV
*
kv
=
tagKVs
[
j
];
TAOS_BIND
*
bind
=
taosArrayGet
(
tagBinds
,
kv
->
fieldSchemaIdx
);
bind
->
buffer_type
=
kv
->
type
;
bind
->
length
=
malloc
(
sizeof
(
uintptr_t
*
));
*
bind
->
length
=
kv
->
length
;
bind
->
buffer
=
kv
->
value
;
bind
->
is_null
=
NULL
;
int32_t
code
=
batch
->
code
;
batch
->
tryAgain
=
false
;
batch
->
resetQueryCache
=
false
;
batch
->
sleep
=
false
;
if
((
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
||
code
==
TSDB_CODE_TDB_TABLE_RECONFIGURE
||
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
&&
batch
->
tryTimes
<
TSDB_MAX_REPLICA
)
{
batch
->
tryAgain
=
true
;
}
//rows bind
SArray
*
rowsBind
=
taosArrayInit
(
rows
,
POINTER_BYTES
);
for
(
int
i
=
0
;
i
<
rows
;
++
i
)
{
TAOS_SML_DATA_POINT
*
point
=
taosArrayGetP
(
cTablePoints
,
i
);
TAOS_BIND
*
colBinds
=
calloc
(
numCols
,
sizeof
(
TAOS_BIND
));
if
(
colBinds
==
NULL
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu"
,
info
->
id
,
rows
,
numCols
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
for
(
int
j
=
0
;
j
<
numCols
;
++
j
)
{
TAOS_BIND
*
bind
=
colBinds
+
j
;
bind
->
is_null
=
&
isNullColBind
;
}
for
(
int
j
=
0
;
j
<
point
->
fieldNum
;
++
j
)
{
TAOS_SML_KV
*
kv
=
point
->
fields
+
j
;
TAOS_BIND
*
bind
=
colBinds
+
kv
->
fieldSchemaIdx
;
bind
->
buffer_type
=
kv
->
type
;
bind
->
length
=
malloc
(
sizeof
(
uintptr_t
*
));
*
bind
->
length
=
kv
->
length
;
bind
->
buffer
=
kv
->
value
;
bind
->
is_null
=
NULL
;
if
(
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
)
{
batch
->
resetQueryCache
=
true
;
if
(
batch
->
tryAgain
)
{
batch
->
sleep
=
true
;
}
taosArrayPush
(
rowsBind
,
&
colBinds
);
}
int32_t
code
=
0
;
code
=
insertChildTablePointsBatch
(
taos
,
cTableName
,
sTableName
,
sTableSchema
->
tags
,
tagBinds
,
sTableSchema
->
fields
,
rowsBind
,
rowSize
,
info
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" insert into child table %s failed. error %s"
,
info
->
id
,
cTableName
,
tstrerror
(
code
));
}
//free rows bind
for
(
int
i
=
0
;
i
<
rows
;
++
i
)
{
TAOS_BIND
*
colBinds
=
taosArrayGetP
(
rowsBind
,
i
);
for
(
int
j
=
0
;
j
<
numCols
;
++
j
)
{
TAOS_BIND
*
bind
=
colBinds
+
j
;
free
(
bind
->
length
);
if
(
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
if
(
batch
->
tryAgain
)
{
batch
->
sleep
=
true
;
}
free
(
colBinds
);
}
taosArrayDestroy
(
&
rowsBind
);
//free tag bind
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tagBinds
);
++
i
)
{
TAOS_BIND
*
bind
=
taosArrayGet
(
tagBinds
,
i
);
free
(
bind
->
length
);
}
taosArrayDestroy
(
&
tagBinds
);
return
code
;
tsem_post
(
&
batch
->
sem
);
}
static
int32_t
insertChildTablePointsBatch
(
TAOS
*
taos
,
char
*
cTableName
,
char
*
sTableName
,
SArray
*
tagsSchema
,
SArray
*
tagsBind
,
SArray
*
colsSchema
,
SArray
*
rowsBind
,
size_t
rowSize
,
SSmlLinesInfo
*
info
)
{
size_t
numTags
=
taosArrayGetSize
(
tagsSchema
);
size_t
numCols
=
taosArrayGetSize
(
colsSchema
);
char
*
sql
=
malloc
(
tsMaxSQLStringLen
+
1
);
if
(
sql
==
NULL
)
{
tscError
(
"malloc sql memory error"
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
static
int32_t
applyDataPointsWithSqlInsert
(
TAOS
*
taos
,
TAOS_SML_DATA_POINT
*
points
,
int32_t
numPoints
,
SArray
*
stableSchemas
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
freeBytes
=
tsMaxSQLStringLen
+
1
;
sprintf
(
sql
,
"insert into ? using %s ("
,
sTableName
);
for
(
int
i
=
0
;
i
<
numTags
;
++
i
)
{
SSchema
*
tagSchema
=
taosArrayGet
(
tagsSchema
,
i
);
snprintf
(
sql
+
strlen
(
sql
),
freeBytes
-
strlen
(
sql
),
"%s,"
,
tagSchema
->
name
);
SHashObj
*
cname2points
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
arrangePointsByChildTableName
(
points
,
numPoints
,
cname2points
,
stableSchemas
,
info
);
for
(
int
i
=
0
;
i
<
MAX_SML_SQL_INSERT_BATCHES
;
++
i
)
{
info
->
batches
[
i
].
id
=
info
->
id
;
info
->
batches
[
i
].
index
=
i
;
info
->
batches
[
i
].
sql
=
NULL
;
info
->
batches
[
i
].
tryTimes
=
0
;
tsem_init
(
&
info
->
batches
[
i
].
sem
,
0
,
0
);
}
snprintf
(
sql
+
strlen
(
sql
)
-
1
,
freeBytes
-
strlen
(
sql
)
+
1
,
")"
);
snprintf
(
sql
+
strlen
(
sql
),
freeBytes
-
strlen
(
sql
),
" tags ("
);
info
->
numBatches
=
0
;
SSmlSqlInsertBatch
*
batch
=
info
->
batches
;
batch
->
sql
=
malloc
(
tsMaxSQLStringLen
+
1
);
int32_t
freeBytes
=
tsMaxSQLStringLen
;
int32_t
usedBytes
=
sprintf
(
batch
->
sql
,
"insert into"
);
freeBytes
-=
usedBytes
;
for
(
int
i
=
0
;
i
<
numTags
;
++
i
)
{
snprintf
(
sql
+
strlen
(
sql
),
freeBytes
-
strlen
(
sql
),
"?,"
);
}
snprintf
(
sql
+
strlen
(
sql
)
-
1
,
freeBytes
-
strlen
(
sql
)
+
1
,
") ("
);
int32_t
cTableSqlLen
=
0
;
for
(
int
i
=
0
;
i
<
numCols
;
++
i
)
{
SSchema
*
colSchema
=
taosArrayGet
(
colsSchema
,
i
);
snprintf
(
sql
+
strlen
(
sql
),
freeBytes
-
strlen
(
sql
),
"%s,"
,
colSchema
->
name
);
}
snprintf
(
sql
+
strlen
(
sql
)
-
1
,
freeBytes
-
strlen
(
sql
)
+
1
,
") values ("
);
SArray
**
pCTablePoints
=
taosHashIterate
(
cname2points
,
NULL
);
while
(
pCTablePoints
)
{
SArray
*
cTablePoints
=
*
pCTablePoints
;
for
(
int
i
=
0
;
i
<
numCols
;
++
i
)
{
snprintf
(
sql
+
strlen
(
sql
),
freeBytes
-
strlen
(
sql
),
"?,"
);
}
snprintf
(
sql
+
strlen
(
sql
)
-
1
,
freeBytes
-
strlen
(
sql
)
+
1
,
")"
);
sql
[
strlen
(
sql
)]
=
'\0'
;
TAOS_SML_DATA_POINT
*
point
=
taosArrayGetP
(
cTablePoints
,
0
);
SSmlSTableSchema
*
sTableSchema
=
taosArrayGet
(
stableSchemas
,
point
->
schemaIdx
);
tscDebug
(
"SML:0x%"
PRIx64
" insert child table table %s of super table %s : %s"
,
info
->
id
,
cTableName
,
sTableName
,
sql
);
int32_t
nextIndex
=
0
;
int32_t
fromIndex
=
nextIndex
;
while
(
nextIndex
!=
taosArrayGetSize
(
cTablePoints
))
{
fromIndex
=
nextIndex
;
code
=
addChildTableDataPointsToInsertSql
(
point
->
childTableName
,
point
->
stableName
,
sTableSchema
,
cTablePoints
,
batch
->
sql
+
usedBytes
,
freeBytes
,
&
cTableSqlLen
,
fromIndex
,
&
nextIndex
,
info
);
tscDebug
(
"SML:0x%"
PRIx64
" add child table points to SQL. child table: %s of super table %s. range[%d-%d)."
,
info
->
id
,
point
->
childTableName
,
point
->
stableName
,
fromIndex
,
nextIndex
);
usedBytes
+=
cTableSqlLen
;
freeBytes
-=
cTableSqlLen
;
if
(
nextIndex
!=
taosArrayGetSize
(
cTablePoints
))
{
batch
->
sql
[
usedBytes
]
=
'\0'
;
info
->
numBatches
++
;
tscDebug
(
"SML:0x%"
PRIx64
" sql: %s"
,
info
->
id
,
batch
->
sql
);
if
(
info
->
numBatches
>=
MAX_SML_SQL_INSERT_BATCHES
)
{
tscError
(
"SML:0x%"
PRIx64
" Apply points failed. exceeds max sql insert batches"
,
info
->
id
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
cleanup
;
}
size_t
maxBatchSize
=
TSDB_MAX_WAL_SIZE
/
rowSize
*
2
/
3
;
size_t
rows
=
taosArrayGetSize
(
rowsBind
);
size_t
batchSize
=
MIN
(
maxBatchSize
,
rows
);
tscDebug
(
"SML:0x%"
PRIx64
" insert rows into child table %s. num of rows: %zu, batch size: %zu"
,
info
->
id
,
cTableName
,
rows
,
batchSize
);
SArray
*
batchBind
=
taosArrayInit
(
batchSize
,
POINTER_BYTES
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
rows
;)
{
int
j
=
i
;
for
(;
j
<
i
+
batchSize
&&
j
<
rows
;
++
j
)
{
taosArrayPush
(
batchBind
,
taosArrayGet
(
rowsBind
,
j
));
}
if
(
j
>
i
)
{
tscDebug
(
"SML:0x%"
PRIx64
" insert child table batch from line %d to line %d."
,
info
->
id
,
i
,
j
-
1
);
code
=
doInsertChildTablePoints
(
taos
,
sql
,
cTableName
,
tagsBind
,
batchBind
,
info
);
if
(
code
!=
0
)
{
taosArrayDestroy
(
&
batchBind
);
tfree
(
sql
);
return
code
;
batch
=
&
info
->
batches
[
info
->
numBatches
];
batch
->
sql
=
malloc
(
tsMaxSQLStringLen
+
1
);
freeBytes
=
tsMaxSQLStringLen
;
usedBytes
=
sprintf
(
batch
->
sql
,
"insert into"
);
freeBytes
-=
usedBytes
;
}
taosArrayClear
(
batchBind
);
}
i
=
j
;
}
taosArrayDestroy
(
&
batchBind
);
tfree
(
sql
);
return
code
;
}
static
int32_t
doInsertChildTablePoints
(
TAOS
*
taos
,
char
*
sql
,
char
*
cTableName
,
SArray
*
tagsBind
,
SArray
*
batchBind
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
0
;
TAOS_STMT
*
stmt
=
taos_stmt_init
(
taos
);
if
(
stmt
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pCTablePoints
=
taosHashIterate
(
cname2points
,
pCTablePoints
);
}
code
=
taos_stmt_prepare
(
stmt
,
sql
,
(
unsigned
long
)
strlen
(
sql
))
;
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
"
taos_stmt_prepare return %d:%s"
,
info
->
id
,
code
,
taos_stmt_errstr
(
stmt
)
);
taos_stmt_close
(
stmt
)
;
return
code
;
batch
->
sql
[
usedBytes
]
=
'\0'
;
info
->
numBatches
++
;
tscDebug
(
"SML:0x%"
PRIx64
" sql: %s"
,
info
->
id
,
batch
->
sql
);
if
(
info
->
numBatches
>=
MAX_SML_SQL_INSERT_BATCHES
)
{
tscError
(
"SML:0x%"
PRIx64
"
Apply points failed. exceeds max sql insert batches"
,
info
->
id
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
cleanup
;
}
bool
batchesExecuted
[
MAX_SML_SQL_INSERT_BATCHES
]
=
{
false
};
bool
tryAgain
=
false
;
int32_t
try
=
0
;
do
{
code
=
taos_stmt_set_tbname_tags
(
stmt
,
cTableName
,
TARRAY_GET_START
(
tagsBind
));
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_stmt_set_tbname return %d:%s"
,
info
->
id
,
code
,
taos_stmt_errstr
(
stmt
));
int
affectedRows
=
taos_stmt_affected_rows
(
stmt
);
info
->
affectedRows
+=
affectedRows
;
taos_stmt_close
(
stmt
);
return
code
;
}
size_t
rows
=
taosArrayGetSize
(
batchBind
);
for
(
int32_t
i
=
0
;
i
<
rows
;
++
i
)
{
TAOS_BIND
*
colsBinds
=
taosArrayGetP
(
batchBind
,
i
);
code
=
taos_stmt_bind_param
(
stmt
,
colsBinds
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_stmt_bind_param return %d:%s"
,
info
->
id
,
code
,
taos_stmt_errstr
(
stmt
));
int
affectedRows
=
taos_stmt_affected_rows
(
stmt
);
info
->
affectedRows
+=
affectedRows
;
taos_stmt_close
(
stmt
);
return
code
;
}
code
=
taos_stmt_add_batch
(
stmt
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_stmt_add_batch return %d:%s"
,
info
->
id
,
code
,
taos_stmt_errstr
(
stmt
));
int
affectedRows
=
taos_stmt_affected_rows
(
stmt
);
info
->
affectedRows
+=
affectedRows
;
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
SSmlSqlInsertBatch
*
insertBatch
=
&
info
->
batches
[
i
];
insertBatch
->
tryTimes
=
1
;
taos_query_a
(
taos
,
insertBatch
->
sql
,
insertCallback
,
insertBatch
);
batchesExecuted
[
i
]
=
true
;
}
int32_t
triedBatches
=
info
->
numBatches
;
taos_stmt_close
(
stmt
);
return
code
;
while
(
triedBatches
>
0
)
{
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
if
(
batchesExecuted
[
i
])
{
tsem_wait
(
&
info
->
batches
[
i
].
sem
);
info
->
affectedRows
+=
info
->
batches
[
i
].
affectedRows
;
}
}
code
=
taos_stmt_execute
(
stmt
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_stmt_execute return %d:%s, try:%d"
,
info
->
id
,
code
,
taos_stmt_errstr
(
stmt
),
try
);
}
tscDebug
(
"SML:0x%"
PRIx64
" taos_stmt_execute inserted %d rows"
,
info
->
id
,
taos_stmt_affected_rows
(
stmt
));
tryAgain
=
false
;
if
((
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
||
code
==
TSDB_CODE_TDB_TABLE_RECONFIGURE
||
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
&&
try
++
<
TSDB_MAX_REPLICA
)
{
tryAgain
=
true
;
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
SSmlSqlInsertBatch
*
b
=
info
->
batches
+
i
;
if
(
b
->
resetQueryCache
)
{
TAOS_RES
*
res
=
taos_query
(
taos
,
"RESET QUERY CACHE"
);
taos_free_result
(
res
);
break
;
}
}
if
(
code
==
TSDB_CODE_TDB_INVALID_TABLE_ID
||
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
)
{
TAOS_RES
*
res2
=
taos_query
(
taos
,
"RESET QUERY CACHE"
);
int32_t
code2
=
taos_errno
(
res2
);
if
(
code2
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"SML:0x%"
PRIx64
" insert child table. reset query cache. error: %s"
,
info
->
id
,
taos_errstr
(
res2
));
}
taos_free_result
(
res2
);
if
(
tryAgain
)
{
taosMsleep
(
100
*
(
2
<<
try
));
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
SSmlSqlInsertBatch
*
b
=
info
->
batches
+
i
;
if
(
b
->
sleep
)
{
taosMsleep
(
100
*
(
2
<<
b
->
tryTimes
));
break
;
}
}
if
(
code
==
TSDB_CODE_APP_NOT_READY
||
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
if
(
tryAgain
)
{
taosMsleep
(
100
*
(
2
<<
try
));
memset
(
batchesExecuted
,
0
,
sizeof
(
batchesExecuted
));
triedBatches
=
0
;
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
SSmlSqlInsertBatch
*
insertBatch
=
&
info
->
batches
[
i
];
if
(
insertBatch
->
tryAgain
)
{
insertBatch
->
tryTimes
++
;
taos_query_a
(
taos
,
insertBatch
->
sql
,
insertCallback
,
insertBatch
);
batchesExecuted
[
i
]
=
true
;
triedBatches
++
;
}
}
}
while
(
tryAgain
);
int
affectedRows
=
taos_stmt_affected_rows
(
stmt
);
info
->
affectedRows
+=
affectedRows
;
taos_stmt_close
(
stmt
);
return
code
;
return
0
;
}
static
int32_t
applyChildTableDataPoints
(
TAOS
*
taos
,
char
*
cTableName
,
char
*
sTableName
,
SSmlSTableSchema
*
sTableSchema
,
SArray
*
cTablePoints
,
size_t
rowSize
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
size_t
childTableDataPoints
=
taosArrayGetSize
(
cTablePoints
);
if
(
childTableDataPoints
<
10
)
{
code
=
applyChildTableDataPointsWithInsertSQL
(
taos
,
cTableName
,
sTableName
,
sTableSchema
,
cTablePoints
,
rowSize
,
info
);
}
else
{
code
=
applyChildTableDataPointsWithStmt
(
taos
,
cTableName
,
sTableName
,
sTableSchema
,
cTablePoints
,
rowSize
,
info
);
}
return
code
;
}
static
int32_t
applyDataPoints
(
TAOS
*
taos
,
TAOS_SML_DATA_POINT
*
points
,
int32_t
numPoints
,
SArray
*
stableSchemas
,
SSmlLinesInfo
*
info
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
cname2points
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
arrangePointsByChildTableName
(
points
,
numPoints
,
cname2points
,
stableSchemas
,
info
);
SArray
**
pCTablePoints
=
taosHashIterate
(
cname2points
,
NULL
);
while
(
pCTablePoints
)
{
SArray
*
cTablePoints
=
*
pCTablePoints
;
TAOS_SML_DATA_POINT
*
point
=
taosArrayGetP
(
cTablePoints
,
0
);
SSmlSTableSchema
*
sTableSchema
=
taosArrayGet
(
stableSchemas
,
point
->
schemaIdx
);
size_t
rowSize
=
0
;
size_t
numCols
=
taosArrayGetSize
(
sTableSchema
->
fields
);
for
(
int
i
=
0
;
i
<
numCols
;
++
i
)
{
SSchema
*
colSchema
=
taosArrayGet
(
sTableSchema
->
fields
,
i
);
rowSize
+=
colSchema
->
bytes
;
}
tscDebug
(
"SML:0x%"
PRIx64
" apply child table points. child table: %s of super table %s, row size: %zu"
,
info
->
id
,
point
->
childTableName
,
point
->
stableName
,
rowSize
);
code
=
applyChildTableDataPoints
(
taos
,
point
->
childTableName
,
point
->
stableName
,
sTableSchema
,
cTablePoints
,
rowSize
,
info
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" Apply child table points failed. child table %s, error %s"
,
info
->
id
,
point
->
childTableName
,
tstrerror
(
code
));
goto
cleanup
;
code
=
0
;
for
(
int
i
=
0
;
i
<
info
->
numBatches
;
++
i
)
{
SSmlSqlInsertBatch
*
b
=
info
->
batches
+
i
;
if
(
b
->
code
!=
0
)
{
code
=
b
->
code
;
}
tscDebug
(
"SML:0x%"
PRIx64
" successfully applied data points of child table %s"
,
info
->
id
,
point
->
childTableName
);
pCTablePoints
=
taosHashIterate
(
cname2points
,
pCTablePoints
);
}
cleanup:
for
(
int
i
=
0
;
i
<
MAX_SML_SQL_INSERT_BATCHES
;
++
i
)
{
free
(
info
->
batches
[
i
].
sql
);
info
->
batches
[
i
].
sql
=
NULL
;
tsem_destroy
(
&
info
->
batches
[
i
].
sem
);
}
pCTablePoints
=
taosHashIterate
(
cname2points
,
NULL
);
while
(
pCTablePoints
)
{
SArray
*
pPoints
=
*
pCTablePoints
;
...
...
@@ -1298,6 +1178,7 @@ static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlL
for
(
int
col
=
1
;
col
<
point
->
fieldNum
;
++
col
)
{
TAOS_SML_KV
*
kv
=
point
->
fields
+
col
;
int32_t
len
=
0
;
if
(
freeBytes
-
sqlLen
<=
kv
->
length
)
{
tscError
(
"SML:0x%"
PRIx64
" no free space for converToStr"
,
info
->
id
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -1362,7 +1243,7 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
}
tscDebug
(
"SML:0x%"
PRIx64
" apply data points"
,
info
->
id
);
code
=
applyDataPoints
(
taos
,
points
,
numPoint
,
stableSchemas
,
info
);
code
=
applyDataPoints
WithSqlInsert
(
taos
,
points
,
numPoint
,
stableSchemas
,
info
);
if
(
code
!=
0
)
{
tscError
(
"SML:0x%"
PRIx64
" error apply data points : %s"
,
info
->
id
,
tstrerror
(
code
));
}
...
...
@@ -2163,7 +2044,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash
return
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
}
pKV
->
key
=
calloc
(
len
+
TS_BACKQUOTE_CHAR_SIZE
+
1
,
1
);
pKV
->
key
=
malloc
(
len
+
TS_BACKQUOTE_CHAR_SIZE
+
1
);
memcpy
(
pKV
->
key
,
key
,
len
+
1
);
addEscapeCharToString
(
pKV
->
key
,
len
);
tscDebug
(
"SML:0x%"
PRIx64
" Key:%s|len:%d"
,
info
->
id
,
pKV
->
key
,
len
);
...
...
@@ -2206,7 +2087,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
switch
(
tag_state
)
{
case
tag_common
:
if
(
back_slash
==
true
)
{
if
(
*
cur
!=
','
&&
*
cur
!=
'='
&&
*
cur
!=
' '
)
{
if
(
*
cur
!=
','
&&
*
cur
!=
'='
&&
*
cur
!=
' '
&&
*
cur
!=
'n'
)
{
tscError
(
"SML:0x%"
PRIx64
" tag value: state(%d), incorrect character(%c) escaped"
,
info
->
id
,
tag_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
...
...
@@ -2271,7 +2152,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
break
;
case
tag_lqoute
:
if
(
back_slash
==
true
)
{
if
(
*
cur
!=
','
&&
*
cur
!=
'='
&&
*
cur
!=
' '
)
{
if
(
*
cur
!=
','
&&
*
cur
!=
'='
&&
*
cur
!=
' '
&&
*
cur
!=
'n'
)
{
tscError
(
"SML:0x%"
PRIx64
" tag value: state(%d), incorrect character(%c) escaped"
,
info
->
id
,
tag_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
...
...
@@ -2342,7 +2223,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
switch
(
val_state
)
{
case
val_common
:
if
(
back_slash
==
true
)
{
if
(
*
cur
!=
'\\'
&&
*
cur
!=
'"'
)
{
if
(
*
cur
!=
'\\'
&&
*
cur
!=
'"'
&&
*
cur
!=
'n'
)
{
tscError
(
"SML:0x%"
PRIx64
" field value: state(%d), incorrect character(%c) escaped"
,
info
->
id
,
val_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
...
...
@@ -2437,7 +2318,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
break
;
case
val_lqoute
:
if
(
back_slash
==
true
)
{
if
(
*
cur
!=
'\\'
&&
*
cur
!=
'"'
)
{
if
(
*
cur
!=
'\\'
&&
*
cur
!=
'"'
&&
*
cur
!=
'n'
)
{
tscError
(
"SML:0x%"
PRIx64
" field value: state(%d), incorrect character(%c) escaped"
,
info
->
id
,
val_state
,
*
cur
);
ret
=
TSDB_CODE_TSC_LINE_SYNTAX_ERROR
;
goto
error
;
...
...
@@ -2606,13 +2487,16 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
int32_t
capacity
=
0
;
if
(
isField
)
{
capacity
=
64
;
*
pKVs
=
calloc
(
capacity
,
sizeof
(
TAOS_SML_KV
));
*
pKVs
=
malloc
(
capacity
*
sizeof
(
TAOS_SML_KV
));
memset
(
*
pKVs
,
0
,
capacity
*
sizeof
(
TAOS_SML_KV
));
// leave space for timestamp;
pkv
=
*
pKVs
;
pkv
++
;
*
num_kvs
=
1
;
// ts fixed column
}
else
{
capacity
=
8
;
*
pKVs
=
calloc
(
capacity
,
sizeof
(
TAOS_SML_KV
));
*
pKVs
=
malloc
(
capacity
*
sizeof
(
TAOS_SML_KV
));
memset
(
*
pKVs
,
0
,
capacity
*
sizeof
(
TAOS_SML_KV
));
pkv
=
*
pKVs
;
}
...
...
@@ -2673,7 +2557,7 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
*
pKVs
=
more_kvs
;
//move pKV points to next TAOS_SML_KV block
if
(
isField
)
{
pkv
=
*
pKVs
+
*
num_kvs
+
1
;
pkv
=
*
pKVs
+
*
num_kvs
;
// first ts column reserved
}
else
{
pkv
=
*
pKVs
+
*
num_kvs
;
}
...
...
@@ -2695,7 +2579,7 @@ static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *t
tsField
->
key
=
malloc
(
strlen
(
ts
->
key
)
+
1
);
memcpy
(
tsField
->
key
,
ts
->
key
,
strlen
(
ts
->
key
)
+
1
);
memcpy
(
tsField
->
value
,
ts
->
value
,
ts
->
length
);
(
*
smlData
)
->
fieldNum
=
(
*
smlData
)
->
fieldNum
+
1
;
//(*smlData)->fieldNum = (*smlData)->fieldNum + 1; // already reserved for first ts column
free
(
ts
->
key
);
free
(
ts
->
value
);
...
...
@@ -2707,7 +2591,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
int32_t
ret
=
TSDB_CODE_SUCCESS
;
uint8_t
has_tags
=
0
;
TAOS_SML_KV
*
timestamp
=
NULL
;
SHashObj
*
keyHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
SHashObj
*
keyHashTable
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
ret
=
parseSmlMeasurement
(
smlData
,
&
index
,
&
has_tags
,
info
);
if
(
ret
)
{
...
...
@@ -2753,14 +2637,21 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
//=========================================================================
void
destroySmlDataPoint
(
TAOS_SML_DATA_POINT
*
point
)
{
TAOS_SML_KV
*
pkv
;
for
(
int
i
=
0
;
i
<
point
->
tagNum
;
++
i
)
{
free
((
point
->
tags
+
i
)
->
key
);
free
((
point
->
tags
+
i
)
->
value
);
pkv
=
point
->
tags
+
i
;
if
(
pkv
->
key
)
free
(
pkv
->
key
);
if
(
pkv
->
value
)
free
(
pkv
->
value
);
}
free
(
point
->
tags
);
for
(
int
i
=
0
;
i
<
point
->
fieldNum
;
++
i
)
{
free
((
point
->
fields
+
i
)
->
key
);
free
((
point
->
fields
+
i
)
->
value
);
pkv
=
point
->
fields
+
i
;
if
(
pkv
->
key
)
free
(
pkv
->
key
);
if
(
pkv
->
value
)
free
(
pkv
->
value
);
}
free
(
point
->
fields
);
free
(
point
->
stableName
);
...
...
@@ -2792,8 +2683,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
info
->
tsType
=
tsType
;
info
->
protocol
=
protocol
;
if
(
numLines
<=
0
||
numLines
>
65536
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_insert_lines numLines should be between 1 and 65536. numLines: %d"
,
info
->
id
,
numLines
);
if
(
numLines
<=
0
||
numLines
>
65536
*
32
)
{
tscError
(
"SML:0x%"
PRIx64
" taos_insert_lines numLines should be between 1 and 65536
*32
. numLines: %d"
,
info
->
id
,
numLines
);
tfree
(
info
);
code
=
TSDB_CODE_TSC_APP_ERROR
;
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录