Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4f06ca3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c4f06ca3
编写于
7月 06, 2021
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
develop schemaless
上级
2bf253d3
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
731 addition
and
0 deletion
+731
-0
src/client/src/tscParseLineProtocol.c
src/client/src/tscParseLineProtocol.c
+731
-0
未找到文件。
src/client/src/tscParseLineProtocol.c
0 → 100644
浏览文件 @
c4f06ca3
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "taos.h"
#include "tsclient.h"
#include "tscLog.h"
#include "hash.h"
#include "tskiplist.h"
#include "tscUtil.h"
typedef
enum
{
LP_ITEM_TAG
,
LP_ITEM_FIELD
}
LPItemKind
;
typedef
struct
{
SStrToken
key
;
SStrToken
value
;
char
name
[
TSDB_COL_NAME_LEN
];
int8_t
type
;
int16_t
bytes
;
char
*
payload
;
}
SLPItem
;
typedef
struct
{
SStrToken
measToken
;
SStrToken
tsToken
;
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
SArray
*
tags
;
SArray
*
fields
;
int64_t
ts
;
}
SLPPoint
;
typedef
enum
{
LP_MEASUREMENT
,
LP_TAG_KEY
,
LP_TAG_VALUE
,
LP_FIELD_KEY
,
LP_FIELD_VALUE
}
LPPart
;
int32_t
scanToCommaOrSpace
(
SStrToken
s
,
int32_t
start
,
int32_t
*
index
,
LPPart
part
)
{
for
(
int32_t
i
=
start
;
i
<
s
.
n
;
++
i
)
{
if
(
s
.
z
[
i
]
==
','
||
s
.
z
[
i
]
==
' '
)
{
*
index
=
i
;
return
0
;
}
}
return
-
1
;
}
int32_t
scanToEqual
(
SStrToken
s
,
int32_t
start
,
int32_t
*
index
)
{
for
(
int32_t
i
=
start
;
i
<
s
.
n
;
++
i
)
{
if
(
s
.
z
[
i
]
==
'='
)
{
*
index
=
i
;
return
0
;
}
}
return
-
1
;
}
int32_t
setPointMeasurement
(
SLPPoint
*
point
,
SStrToken
token
)
{
point
->
measToken
=
token
;
if
(
point
->
measToken
.
n
<
TSDB_TABLE_NAME_LEN
)
{
strncpy
(
point
->
sTableName
,
point
->
measToken
.
z
,
point
->
measToken
.
n
);
point
->
sTableName
[
point
->
measToken
.
n
]
=
'\0'
;
}
return
0
;
}
int32_t
setItemKey
(
SLPItem
*
item
,
SStrToken
key
,
LPPart
part
)
{
item
->
key
=
key
;
if
(
item
->
key
.
n
<
TSDB_COL_NAME_LEN
)
{
strncpy
(
item
->
name
,
item
->
key
.
z
,
item
->
key
.
n
);
item
->
name
[
item
->
key
.
n
]
=
'\0'
;
}
return
0
;
}
int32_t
setItemValue
(
SLPItem
*
item
,
SStrToken
value
,
LPPart
part
)
{
item
->
value
=
value
;
return
0
;
}
int32_t
parseItemValue
(
SLPItem
*
item
,
LPItemKind
kind
)
{
char
*
sv
=
item
->
value
.
z
;
char
*
last
=
item
->
value
.
z
+
item
->
value
.
n
-
1
;
if
(
isdigit
(
sv
[
0
])
||
sv
[
0
]
==
'-'
)
{
if
(
*
last
==
'i'
)
{
item
->
type
=
TSDB_DATA_TYPE_BIGINT
;
item
->
bytes
=
(
int16_t
)
tDataTypes
[
item
->
type
].
bytes
;
item
->
payload
=
malloc
(
item
->
bytes
);
char
*
endptr
=
NULL
;
*
(
item
->
payload
)
=
strtoll
(
sv
,
&
endptr
,
10
);
}
else
{
item
->
type
=
TSDB_DATA_TYPE_DOUBLE
;
item
->
bytes
=
(
int16_t
)
tDataTypes
[
item
->
type
].
bytes
;
item
->
payload
=
malloc
(
item
->
bytes
);
char
*
endptr
=
NULL
;
*
(
item
->
payload
)
=
strtold
(
sv
,
&
endptr
);
}
}
else
if
((
sv
[
0
]
==
'L'
&&
sv
[
1
]
==
'"'
)
||
sv
[
0
]
==
'"'
)
{
if
(
sv
[
0
]
==
'L'
)
{
item
->
type
=
TSDB_DATA_TYPE_NCHAR
;
uint32_t
bytes
=
item
->
value
.
n
-
3
;
// uint32_t len = bytes;
// char* ucs = malloc(len);
// int32_t ncharBytes = 0;
// taosMbsToUcs4(sv+2, len, ucs, len, &ncharBytes);
// item->bytes = ncharBytes;
// item->payload = malloc(ncharBytes);
// memcpy(item->payload, ucs, ncharBytes);
// free(ucs);
item
->
bytes
=
bytes
;
item
->
payload
=
malloc
(
bytes
);
memcpy
(
item
->
payload
,
sv
+
1
,
bytes
);
}
else
if
(
sv
[
0
]
==
'"'
){
item
->
type
=
TSDB_DATA_TYPE_BINARY
;
uint32_t
bytes
=
item
->
value
.
n
-
2
;
item
->
bytes
=
bytes
;
item
->
payload
=
malloc
(
bytes
);
memcpy
(
item
->
payload
,
sv
+
1
,
bytes
);
}
}
else
if
(
sv
[
0
]
==
't'
||
sv
[
0
]
==
'f'
||
sv
[
0
]
==
'T'
||
sv
[
0
]
==
'F'
)
{
item
->
type
=
TSDB_DATA_TYPE_BOOL
;
item
->
bytes
=
tDataTypes
[
item
->
type
].
bytes
;
item
->
payload
=
malloc
(
tDataTypes
[
item
->
type
].
bytes
);
*
(
item
->
payload
)
=
tolower
(
sv
[
0
])
==
't'
?
true
:
false
;
}
return
0
;
}
int32_t
compareLPItemKey
(
const
void
*
p1
,
const
void
*
p2
)
{
const
SLPItem
*
t1
=
p1
;
const
SLPItem
*
t2
=
p2
;
uint32_t
min
=
(
t1
->
key
.
n
<
t2
->
key
.
n
)
?
t1
->
key
.
n
:
t2
->
key
.
n
;
int
res
=
strncmp
(
t1
->
key
.
z
,
t2
->
key
.
z
,
min
);
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
(
int
)(
t1
->
key
.
n
)
-
(
int
)(
t2
->
key
.
n
);
}
}
int32_t
setPointTimeStamp
(
SLPPoint
*
point
,
SStrToken
tsToken
)
{
point
->
tsToken
=
tsToken
;
return
0
;
}
int32_t
parsePointTime
(
SLPPoint
*
point
)
{
if
(
point
->
tsToken
.
n
<=
0
)
{
point
->
ts
=
taosGetTimestampNs
();
}
else
{
char
*
endptr
=
NULL
;
point
->
ts
=
strtoll
(
point
->
tsToken
.
z
,
&
endptr
,
10
);
}
return
0
;
}
int32_t
tscParseLine
(
SStrToken
line
,
SLPPoint
*
point
)
{
int32_t
pos
=
0
;
int32_t
start
=
0
;
int32_t
err
=
scanToCommaOrSpace
(
line
,
start
,
&
pos
,
LP_MEASUREMENT
);
if
(
err
!=
0
)
{
tscError
(
"a"
);
return
err
;
}
SStrToken
measurement
=
{.
z
=
line
.
z
+
start
,
.
n
=
pos
-
start
};
setPointMeasurement
(
point
,
measurement
);
point
->
tags
=
taosArrayInit
(
64
,
sizeof
(
SLPItem
));
start
=
pos
+
1
;
while
(
line
.
z
[
start
]
==
','
)
{
SLPItem
item
;
err
=
scanToEqual
(
line
,
start
,
&
pos
);
if
(
err
!=
0
)
{
tscError
(
"b"
);
goto
error
;
}
SStrToken
tagKey
=
{.
z
=
line
.
z
+
start
,
.
n
=
pos
-
start
};
setItemKey
(
&
item
,
tagKey
,
LP_TAG_KEY
);
start
=
pos
+
1
;
err
=
scanToCommaOrSpace
(
line
,
start
,
&
pos
,
LP_TAG_VALUE
);
if
(
err
!=
0
)
{
tscError
(
"c"
);
goto
error
;
}
SStrToken
tagValue
=
{.
z
=
line
.
z
+
start
,
.
n
=
pos
-
start
};
setItemValue
(
&
item
,
tagValue
,
LP_TAG_VALUE
);
parseItemValue
(
&
item
,
LP_ITEM_TAG
);
taosArrayPush
(
point
->
tags
,
&
item
);
start
=
pos
+
1
;
}
taosArraySort
(
point
->
tags
,
compareLPItemKey
);
point
->
fields
=
taosArrayInit
(
64
,
sizeof
(
SLPItem
));
do
{
SLPItem
item
;
err
=
scanToEqual
(
line
,
start
,
&
pos
);
if
(
err
!=
0
)
{
goto
error
;
}
SStrToken
fieldKey
=
{.
z
=
line
.
z
+
start
,
.
n
=
pos
-
start
};
setItemKey
(
&
item
,
fieldKey
,
LP_FIELD_KEY
);
start
=
pos
+
1
;
err
=
scanToCommaOrSpace
(
line
,
start
,
&
pos
,
LP_FIELD_VALUE
);
if
(
err
!=
0
)
{
goto
error
;
}
SStrToken
fieldValue
=
{.
z
=
line
.
z
+
start
,
.
n
=
pos
-
start
};
setItemValue
(
&
item
,
fieldValue
,
LP_TAG_VALUE
);
parseItemValue
(
&
item
,
LP_ITEM_FIELD
);
taosArrayPush
(
point
->
fields
,
&
item
);
start
=
pos
+
1
;
}
while
(
line
.
z
[
pos
]
==
','
);
taosArraySort
(
point
->
fields
,
compareLPItemKey
);
SStrToken
tsToken
=
{.
z
=
line
.
z
+
start
,
.
n
=
line
.
n
-
start
};
setPointTimeStamp
(
point
,
tsToken
);
parsePointTime
(
point
);
goto
done
;
error:
// free array
return
err
;
done:
return
0
;
}
int32_t
tscParseLines
(
char
*
lines
[],
int
numLines
,
SArray
*
points
,
SArray
*
failedLines
)
{
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
SStrToken
tkLine
=
{.
z
=
lines
[
i
],
.
n
=
strlen
(
lines
[
i
])
+
1
};
SLPPoint
point
;
tscParseLine
(
tkLine
,
&
point
);
taosArrayPush
(
points
,
&
point
);
}
return
0
;
}
TAOS_RES
*
taos_insert_by_lines
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
)
{
SArray
*
points
=
taosArrayInit
(
numLines
,
sizeof
(
SLPPoint
));
tscParseLines
(
lines
,
numLines
,
points
,
NULL
);
return
NULL
;
}
//=================================================================================================
typedef
struct
{
char
*
key
;
uint8_t
type
;
int16_t
length
;
char
*
value
;
}
TAOS_SML_KV
;
typedef
struct
{
char
*
stableName
;
char
*
childTableName
;
TAOS_SML_KV
*
tags
;
int
tagNum
;
// first kv must be timestamp
TAOS_SML_KV
*
fields
;
int
fieldNum
;
}
TAOS_SML_DATA_POINT
;
typedef
struct
{
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
SHashObj
*
tagHash
;
SHashObj
*
fieldHash
;
SArray
*
tags
;
//SArray<SSchema>
SArray
*
fields
;
//SArray<TAOS_SFIELD>
}
SSmlSTableSchema
;
int
compareSmlColKv
(
const
void
*
p1
,
const
void
*
p2
)
{
TAOS_SML_KV
*
kv1
=
(
TAOS_SML_KV
*
)
p1
;
TAOS_SML_KV
*
kv2
=
(
TAOS_SML_KV
*
)
p2
;
int
kvLen1
=
(
int
)
strlen
(
kv1
->
key
);
int
kvLen2
=
(
int
)
strlen
(
kv2
->
key
);
int
res
=
strncasecmp
(
kv1
->
key
,
kv2
->
key
,
MIN
(
kvLen1
,
kvLen2
));
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
kvLen1
-
kvLen2
;
}
}
int32_t
getChildTableName
(
TAOS_SML_DATA_POINT
*
point
,
char
*
tableName
,
int
*
tableNameLen
)
{
qsort
(
point
->
tags
,
point
->
tagNum
,
sizeof
(
TAOS_SML_KV
),
compareSmlColKv
);
SStringBuilder
sb
;
memset
(
&
sb
,
0
,
sizeof
(
sb
));
taosStringBuilderAppendString
(
&
sb
,
point
->
stableName
);
for
(
int
j
=
0
;
j
<
point
->
tagNum
;
++
j
)
{
TAOS_SML_KV
*
tagKv
=
point
->
tags
+
j
;
taosStringBuilderAppendChar
(
&
sb
,
','
);
taosStringBuilderAppendString
(
&
sb
,
tagKv
->
key
);
taosStringBuilderAppendChar
(
&
sb
,
'='
);
taosStringBuilderAppend
(
&
sb
,
tagKv
->
value
,
tagKv
->
length
);
}
size_t
len
=
0
;
char
*
keyJoined
=
taosStringBuilderGetResult
(
&
sb
,
&
len
);
MD5_CTX
context
;
MD5Init
(
&
context
);
MD5Update
(
&
context
,
(
uint8_t
*
)
keyJoined
,
(
uint32_t
)
len
);
MD5Final
(
&
context
);
*
tableNameLen
=
snprintf
(
tableName
,
*
tableNameLen
,
"%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x"
,
context
.
digest
[
0
],
context
.
digest
[
1
],
context
.
digest
[
2
],
context
.
digest
[
3
],
context
.
digest
[
4
],
context
.
digest
[
5
],
context
.
digest
[
6
],
context
.
digest
[
7
],
context
.
digest
[
8
],
context
.
digest
[
9
],
context
.
digest
[
10
],
context
.
digest
[
11
],
context
.
digest
[
12
],
context
.
digest
[
13
],
context
.
digest
[
14
],
context
.
digest
[
15
]);
return
0
;
}
int32_t
loadTableMeta
(
TAOS
*
taos
,
char
*
tableName
,
SSmlSTableSchema
*
schema
)
{
int32_t
code
=
0
;
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
TSDB_CODE_TSC_DISCONNECTED
;
}
char
sql
[
256
];
snprintf
(
sql
,
256
,
"describe %s"
,
tableName
);
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
taos_free_result
(
res
);
return
code
;
}
taos_free_result
(
res
);
SSqlObj
*
pSql
=
calloc
(
1
,
sizeof
(
SSqlObj
));
pSql
->
pTscObj
=
taos
;
pSql
->
signature
=
pSql
;
pSql
->
fp
=
NULL
;
SStrToken
tableToken
=
{.
z
=
tableName
,
.
n
=
strlen
(
tableName
),
.
type
=
TK_ID
};
tGetToken
(
tableName
,
&
tableToken
.
type
);
// Check if the table name available or not
if
(
tscValidateName
(
&
tableToken
)
!=
TSDB_CODE_SUCCESS
)
{
code
=
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
sprintf
(
pSql
->
cmd
.
payload
,
"table name is invalid"
);
return
code
;
}
SName
sname
=
{
0
};
if
((
code
=
tscSetTableFullName
(
&
sname
,
&
tableToken
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
char
fullTableName
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
memset
(
fullTableName
,
0
,
tListLen
(
fullTableName
));
tNameExtractFullName
(
&
sname
,
fullTableName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscFreeSqlObj
(
pSql
);
return
code
;
}
tscFreeSqlObj
(
pSql
);
uint32_t
size
=
tscGetTableMetaMaxSize
();
STableMeta
*
tableMeta
=
calloc
(
1
,
size
);
taosHashGetClone
(
tscTableMetaInfo
,
fullTableName
,
strlen
(
fullTableName
),
NULL
,
tableMeta
,
-
1
);
tstrncpy
(
schema
->
sTableName
,
tableName
,
strlen
(
tableName
));
for
(
int
i
=
0
;
i
<
tableMeta
->
tableInfo
.
numOfColumns
;
++
i
)
{
SSchema
field
;
tstrncpy
(
field
.
name
,
tableMeta
->
schema
[
i
].
name
,
strlen
(
tableMeta
->
schema
[
i
].
name
));
field
.
type
=
tableMeta
->
schema
[
i
].
type
;
field
.
bytes
=
tableMeta
->
schema
[
i
].
bytes
;
SSchema
*
pField
=
taosArrayPush
(
schema
->
fields
,
&
field
);
taosHashPut
(
schema
->
fieldHash
,
field
.
name
,
strlen
(
field
.
name
),
&
pField
,
POINTER_BYTES
);
}
for
(
int
i
=
0
;
i
<
tableMeta
->
tableInfo
.
numOfTags
;
++
i
)
{
int
j
=
i
+
tableMeta
->
tableInfo
.
numOfColumns
;
SSchema
field
;
tstrncpy
(
field
.
name
,
tableMeta
->
schema
[
j
].
name
,
strlen
(
tableMeta
->
schema
[
j
].
name
));
field
.
type
=
tableMeta
->
schema
[
j
].
type
;
field
.
bytes
=
tableMeta
->
schema
[
j
].
bytes
;
SSchema
*
pField
=
taosArrayPush
(
schema
->
tags
,
&
field
);
taosHashPut
(
schema
->
tagHash
,
field
.
name
,
strlen
(
field
.
name
),
&
pField
,
POINTER_BYTES
);
}
return
code
;
}
typedef
enum
{
SCHEMA_ACTION_CREATE_STABLE
,
SCHEMA_ACTION_ADD_COLUMN
,
SCHEMA_ACTION_ADD_TAG
,
SCHEMA_ACTION_CHANGE_COLUMN_SIZE
,
SCHEMA_ACTION_CHANGE_TAG_SIZE
,
SCHEMA_ACTION_CREATE_CTABLE
}
ESchemaAction
;
typedef
struct
{
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
SArray
*
tags
;
//SArray<SSchema>
SArray
*
fields
;
//SArray<SSchema>
}
SCreateSTableActionInfo
;
typedef
struct
{
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
SSchema
*
field
;
}
SAlterSTableActionInfo
;
typedef
struct
{
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
char
cTableName
[
TSDB_TABLE_NAME_LEN
];
TAOS_SML_KV
*
tags
;
int
tagNum
;
}
SCreateCTableActionInfo
;
typedef
struct
{
ESchemaAction
action
;
union
{
SCreateSTableActionInfo
createSTable
;
SAlterSTableActionInfo
alterSTable
;
SCreateCTableActionInfo
createCTable
;
};
}
SSchemaAction
;
int32_t
getFieldBytesFromSmlKv
(
TAOS_SML_KV
*
kv
,
int32_t
*
bytes
)
{
if
(
!
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
*
bytes
=
tDataTypes
[
kv
->
type
].
bytes
;
}
else
{
if
(
kv
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
char
*
ucs
=
malloc
(
kv
->
length
*
TSDB_NCHAR_SIZE
+
1
);
int32_t
bytesNeeded
=
0
;
//todo check conversion succeed
taosMbsToUcs4
(
kv
->
value
,
kv
->
length
,
ucs
,
kv
->
length
*
TSDB_NCHAR_SIZE
,
&
bytesNeeded
);
free
(
ucs
);
*
bytes
=
bytesNeeded
+
VARSTR_HEADER_SIZE
;
}
else
if
(
kv
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
*
bytes
=
kv
->
length
+
VARSTR_HEADER_SIZE
;
}
}
return
0
;
}
int32_t
addTaosFieldToHashAndArray
(
TAOS_SML_KV
*
smlKv
,
SHashObj
*
hash
,
SArray
*
array
)
{
SSchema
*
pField
=
NULL
;
SSchema
**
ppField
=
taosHashGet
(
hash
,
smlKv
->
key
,
strlen
(
smlKv
->
key
));
if
(
ppField
)
{
pField
=
*
ppField
;
if
(
pField
->
type
!=
smlKv
->
type
)
{
//TODO:
tscError
(
"type mismatch"
);
return
-
1
;
}
int32_t
bytes
=
0
;
getFieldBytesFromSmlKv
(
smlKv
,
&
bytes
);
pField
->
bytes
=
MAX
(
pField
->
bytes
,
bytes
);
}
else
{
SSchema
field
;
size_t
tagKeyLen
=
strlen
(
smlKv
->
key
);
strncpy
(
field
.
name
,
smlKv
->
key
,
tagKeyLen
);
field
.
name
[
tagKeyLen
]
=
'\0'
;
field
.
type
=
smlKv
->
type
;
int32_t
bytes
=
0
;
getFieldBytesFromSmlKv
(
smlKv
,
&
bytes
);
field
.
bytes
=
bytes
;
pField
=
taosArrayPush
(
array
,
&
field
);
taosHashPut
(
hash
,
field
.
name
,
tagKeyLen
,
&
pField
,
POINTER_BYTES
);
}
return
0
;
}
int32_t
generateSchemaAction
(
SSchema
*
pointColField
,
SHashObj
*
dbAttrHash
,
bool
isTag
,
char
sTableName
[],
SSchemaAction
*
action
,
bool
*
actionNeeded
)
{
SSchema
**
ppDbAttr
=
taosHashGet
(
dbAttrHash
,
pointColField
->
name
,
strlen
(
pointColField
->
name
));
if
(
*
ppDbAttr
)
{
SSchema
*
dbAttr
=
*
ppDbAttr
;
if
(
pointColField
->
type
!=
dbAttr
->
type
)
{
//todo error
return
-
5
;
}
if
(
IS_VAR_DATA_TYPE
(
pointColField
->
type
)
&&
(
pointColField
->
bytes
>
dbAttr
->
bytes
))
{
if
(
isTag
)
{
action
->
action
=
SCHEMA_ACTION_CHANGE_TAG_SIZE
;
}
else
{
action
->
action
=
SCHEMA_ACTION_CHANGE_COLUMN_SIZE
;
}
memset
(
&
action
->
alterSTable
,
0
,
sizeof
(
SAlterSTableActionInfo
));
memcpy
(
action
->
alterSTable
.
sTableName
,
sTableName
,
TSDB_TABLE_NAME_LEN
);
action
->
alterSTable
.
field
=
pointColField
;
*
actionNeeded
=
true
;
}
}
else
{
if
(
isTag
)
{
action
->
action
=
SCHEMA_ACTION_ADD_TAG
;
}
else
{
action
->
action
=
SCHEMA_ACTION_ADD_COLUMN
;
}
memset
(
&
action
->
alterSTable
,
0
,
sizeof
(
SAlterSTableActionInfo
));
memcpy
(
action
->
alterSTable
.
sTableName
,
sTableName
,
TSDB_TABLE_NAME_LEN
);
action
->
alterSTable
.
field
=
pointColField
;
*
actionNeeded
=
true
;
}
return
0
;
}
int
taos_sml_insert
(
TAOS
*
taos
,
TAOS_SML_DATA_POINT
*
points
,
int
numPoint
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SArray
*
stableArray
=
taosArrayInit
(
32
,
sizeof
(
SSmlSTableSchema
));
// SArray<STableColumnsSchema>
SHashObj
*
sname2shema
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
for
(
int
i
=
0
;
i
<
numPoint
;
++
i
)
{
TAOS_SML_DATA_POINT
*
point
=
&
points
[
i
];
SSmlSTableSchema
**
ppStableSchema
=
taosHashGet
(
sname2shema
,
point
->
stableName
,
TSDB_TABLE_NAME_LEN
);
SSmlSTableSchema
*
pStableSchema
=
NULL
;
if
(
ppStableSchema
)
{
pStableSchema
=
*
ppStableSchema
;
}
else
{
SSmlSTableSchema
schema
;
size_t
stableNameLen
=
strlen
(
point
->
stableName
);
strncpy
(
schema
.
sTableName
,
point
->
stableName
,
stableNameLen
);
schema
.
sTableName
[
stableNameLen
]
=
'\0'
;
schema
.
fields
=
taosArrayInit
(
64
,
sizeof
(
SSchema
));
schema
.
tags
=
taosArrayInit
(
8
,
sizeof
(
SSchema
));
schema
.
tagHash
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
schema
.
fieldHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
pStableSchema
=
taosArrayPush
(
stableArray
,
&
schema
);
taosHashPut
(
sname2shema
,
schema
.
sTableName
,
stableNameLen
,
&
pStableSchema
,
POINTER_BYTES
);
}
for
(
int
j
=
0
;
j
<
point
->
tagNum
;
++
j
)
{
TAOS_SML_KV
*
tagKv
=
point
->
tags
+
j
;
addTaosFieldToHashAndArray
(
tagKv
,
pStableSchema
->
tagHash
,
pStableSchema
->
tags
);
}
for
(
int
j
=
0
;
j
<
point
->
fieldNum
;
++
j
)
{
TAOS_SML_KV
*
fieldKv
=
point
->
fields
+
j
;
addTaosFieldToHashAndArray
(
fieldKv
,
pStableSchema
->
fieldHash
,
pStableSchema
->
fields
);
}
}
SArray
*
schemaActions
=
taosArrayInit
(
32
,
sizeof
(
SSchemaAction
));
size_t
numStable
=
taosArrayGetSize
(
stableArray
);
for
(
int
i
=
0
;
i
<
numStable
;
++
i
)
{
SSmlSTableSchema
*
pointSchema
=
taosArrayGet
(
stableArray
,
i
);
SSmlSTableSchema
dbSchema
=
{
0
};
dbSchema
.
fields
=
taosArrayInit
(
64
,
sizeof
(
SSchema
));
dbSchema
.
tags
=
taosArrayInit
(
8
,
sizeof
(
SSchema
));
dbSchema
.
tagHash
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
dbSchema
.
fieldHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
code
=
loadTableMeta
(
taos
,
pointSchema
->
sTableName
,
&
dbSchema
);
if
(
code
==
TSDB_CODE_MND_INVALID_TABLE_NAME
)
{
SSchemaAction
schemaAction
=
{
0
};
schemaAction
.
action
=
SCHEMA_ACTION_CREATE_STABLE
;
memset
(
&
schemaAction
.
createSTable
,
0
,
sizeof
(
SCreateSTableActionInfo
));
memcpy
(
schemaAction
.
createSTable
.
sTableName
,
pointSchema
->
sTableName
,
TSDB_TABLE_NAME_LEN
);
schemaAction
.
createSTable
.
tags
=
pointSchema
->
tags
;
schemaAction
.
createSTable
.
fields
=
pointSchema
->
fields
;
taosArrayPush
(
schemaActions
,
&
schemaAction
);
}
else
if
(
code
==
TSDB_CODE_SUCCESS
)
{
size_t
pointTagSize
=
taosArrayGetSize
(
pointSchema
->
tags
);
size_t
pointFieldSize
=
taosArrayGetSize
(
pointSchema
->
fields
);
SHashObj
*
dbTagHash
=
dbSchema
.
tagHash
;
SHashObj
*
dbFieldHash
=
dbSchema
.
fieldHash
;
for
(
int
j
=
0
;
j
<
pointTagSize
;
++
j
)
{
SSchema
*
pointTag
=
taosArrayGet
(
pointSchema
->
tags
,
j
);
SSchemaAction
schemaAction
=
{
0
};
bool
actionNeeded
=
false
;
generateSchemaAction
(
pointTag
,
dbTagHash
,
true
,
pointSchema
->
sTableName
,
&
schemaAction
,
&
actionNeeded
);
if
(
actionNeeded
)
{
taosArrayPush
(
schemaActions
,
&
schemaAction
);
}
}
for
(
int
j
=
0
;
j
<
pointFieldSize
;
++
j
)
{
SSchema
*
pointCol
=
taosArrayGet
(
pointSchema
->
tags
,
j
);
SSchemaAction
schemaAction
=
{
0
};
bool
actionNeeded
=
false
;
generateSchemaAction
(
pointCol
,
dbFieldHash
,
false
,
pointSchema
->
sTableName
,
&
schemaAction
,
&
actionNeeded
);
if
(
actionNeeded
)
{
taosArrayPush
(
schemaActions
,
&
schemaAction
);
}
}
}
else
{
return
code
;
}
}
return
code
;
}
int32_t
buildColumnDescription
(
SSchema
*
field
,
char
*
buf
,
int32_t
bufSize
,
int32_t
*
outBytes
)
{
uint8_t
type
=
field
->
type
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
int32_t
bytes
=
field
->
bytes
-
VARSTR_HEADER_SIZE
;
if
(
type
==
TSDB_DATA_TYPE_NCHAR
)
{
bytes
=
bytes
/
TSDB_NCHAR_SIZE
;
}
int
out
=
snprintf
(
buf
,
bufSize
,
"%s %s(%d)"
,
field
->
name
,
tDataTypes
[
field
->
type
].
name
,
bytes
);
*
outBytes
=
out
;
}
else
{
int
out
=
snprintf
(
buf
,
bufSize
,
"%s %s"
,
field
->
name
,
tDataTypes
[
type
].
name
);
*
outBytes
=
out
;
}
return
0
;
}
int32_t
applySchemaAction
(
TAOS
*
taos
,
SSchemaAction
*
action
)
{
int32_t
code
=
0
;
int32_t
capacity
=
TSDB_MAX_BINARY_LEN
;
int32_t
outBytes
=
0
;
char
*
result
=
(
char
*
)
calloc
(
1
,
capacity
);
switch
(
action
->
action
)
{
case
SCHEMA_ACTION_ADD_COLUMN
:
{
int
n
=
sprintf
(
result
,
"alter stable %s add column "
,
action
->
alterSTable
.
sTableName
);
buildColumnDescription
(
action
->
alterSTable
.
field
,
result
+
n
,
capacity
-
n
,
&
outBytes
);
TAOS_RES
*
res
=
taos_query
(
taos
,
result
);
//TODO async doAsyncQuery
code
=
taos_errno
(
res
);
break
;
}
case
SCHEMA_ACTION_ADD_TAG
:
{
int
n
=
sprintf
(
result
,
"alter stable %s add tag "
,
action
->
alterSTable
.
sTableName
);
buildColumnDescription
(
action
->
alterSTable
.
field
,
result
+
n
,
capacity
-
n
,
&
outBytes
);
TAOS_RES
*
res
=
taos_query
(
taos
,
result
);
//TODO async doAsyncQuery
code
=
taos_errno
(
res
);
break
;
}
case
SCHEMA_ACTION_CHANGE_COLUMN_SIZE
:
{
int
n
=
sprintf
(
result
,
"alter stable %s modify column "
,
action
->
alterSTable
.
sTableName
);
buildColumnDescription
(
action
->
alterSTable
.
field
,
result
+
n
,
capacity
-
n
,
&
outBytes
);
TAOS_RES
*
res
=
taos_query
(
taos
,
result
);
//TODO async doAsyncQuery
code
=
taos_errno
(
res
);
}
case
SCHEMA_ACTION_CHANGE_TAG_SIZE
:
{
int
n
=
sprintf
(
result
,
"alter stable %s modify tag "
,
action
->
alterSTable
.
sTableName
);
buildColumnDescription
(
action
->
alterSTable
.
field
,
result
+
n
,
capacity
-
n
,
&
outBytes
);
TAOS_RES
*
res
=
taos_query
(
taos
,
result
);
//TODO async doAsyncQuery
code
=
taos_errno
(
res
);
break
;
}
case
SCHEMA_ACTION_CREATE_STABLE
:
{
int
n
=
sprintf
(
result
,
"create stable %s ("
,
action
->
createSTable
.
sTableName
);
char
*
pos
=
result
+
n
;
int
freeBytes
=
capacity
-
n
;
int
numCols
=
taosArrayGetSize
(
action
->
createSTable
.
fields
);
for
(
int32_t
i
=
0
;
i
<
numCols
;
++
i
)
{
SSchema
*
field
=
taosArrayGet
(
action
->
createSTable
.
fields
,
i
);
buildColumnDescription
(
field
,
pos
,
freeBytes
,
&
outBytes
);
pos
+=
outBytes
;
freeBytes
-=
outBytes
;
*
pos
=
','
;
++
pos
;
--
freeBytes
;
}
--
pos
;
++
freeBytes
;
outBytes
=
snprintf
(
pos
,
freeBytes
,
") tags ("
);
int
numTags
=
taosArrayGetSize
(
action
->
createSTable
.
tags
);
pos
+=
outBytes
;
freeBytes
-=
outBytes
;
for
(
int32_t
i
=
0
;
i
<
numTags
;
++
i
)
{
SSchema
*
field
=
taosArrayGet
(
action
->
createSTable
.
tags
,
i
);
buildColumnDescription
(
field
,
pos
,
freeBytes
,
&
outBytes
);
pos
+=
outBytes
;
freeBytes
-=
outBytes
;
*
pos
=
','
;
++
pos
;
--
freeBytes
;
}
pos
--
;
++
freeBytes
;
outBytes
=
snprintf
(
pos
,
freeBytes
,
")"
);
TAOS_RES
*
res
=
taos_query
(
taos
,
result
);
code
=
taos_errno
(
res
);
break
;
}
case
SCHEMA_ACTION_CREATE_CTABLE
:
{
break
;
}
default:
break
;
}
free
(
result
);
return
code
;
}
//todo: table/column length check
//todo: type check
//todo: taosmbs2ucs4 check
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录