Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
bcc1b406
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bcc1b406
编写于
8月 09, 2019
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix issue #313
上级
744d31cb
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
220 addition
and
623 deletion
+220
-623
src/modules/http/src/tgHandle.c
src/modules/http/src/tgHandle.c
+220
-623
未找到文件。
src/modules/http/src/tgHandle.c
浏览文件 @
bcc1b406
...
...
@@ -19,6 +19,43 @@
#include "tgJson.h"
#include "tsdb.h"
/*
* taos.telegraf.cfg formats like
{
"metrics": [
{
"name" : "system",
"tbname" : "system_uptime",
"fields": [
"uptime"
]
},
{
"name": "system",
"tbname" : "system_uptime_format",
"fields": [
"uptime_format"
]
},
{
"name": "swap",
"tbname" : "swap_in",
"fields": [
"in"
]
},
{
"name": "cpu",
"tbname" : "cpu_time",
"fields": [
"time_active",
"time_guest"
]
}
]
}
*/
#define TG_MAX_SORT_TAG_SIZE 20
static
HttpDecodeMethod
tgDecodeMethod
=
{
"telegraf"
,
tgProcessRquest
};
...
...
@@ -26,80 +63,72 @@ static HttpEncodeMethod tgQueryMethod = {tgStartQueryJson, tgStopQueryJs
tgBuildSqlAffectRowsJson
,
tgInitQueryJson
,
tgCleanQueryJson
,
tgCheckFinished
,
tgSetNextCmd
};
typedef
struct
{
char
*
tagName
;
char
*
tagAlias
;
char
*
tagType
;
}
STgTag
;
static
const
char
DEFAULT_TELEGRAF_CFG
[]
=
"{
\"
metrics
\"
:["
"{
\"
name
\"
:
\"
system
\"
,
\"
tbname
\"
:
\"
system_uptime
\"
,
\"
fields
\"
:[
\"
uptime
\"
]},"
"{
\"
name
\"
:
\"
system
\"
,
\"
tbname
\"
:
\"
system_uptime_format
\"
,
\"
fields
\"
:[
\"
uptime_format
\"
]},"
"{
\"
name
\"
:
\"
swap
\"
,
\"
tbname
\"
:
\"
swap_in
\"
,
\"
fields
\"
:[
\"
in
\"
]},"
"{
\"
name
\"
:
\"
cpu
\"
,
\"
tbname
\"
:
\"
cpu_time
\"
,
\"
fields
\"
:[
\"
time_active
\"
,
\"
time_guest
\"
]}"
"]}"
;
typedef
struct
{
char
*
fieldName
;
char
*
fieldAlias
;
char
*
fieldType
;
}
STgField
;
char
*
name
;
char
*
tbName
;
char
**
fields
;
int
fieldNum
;
}
STgSchema
;
typedef
struct
{
char
*
stName
;
char
*
stAlias
;
STgTag
*
tags
;
STgField
*
fields
;
int16_t
tagNum
;
int16_t
fieldNum
;
char
*
createSTableStr
;
}
STgStable
;
STgSchema
*
schemas
;
int
size
;
int
pos
;
}
STgSchemas
;
/*
* hash of STgStable
*/
static
void
*
tgSchemaHash
=
NULL
;
static
STgSchemas
tgSchemas
=
{
0
};
/*
* formats like
* behind the midline is an alias of field/tag/stable
{
"metrics": [{
"name": "win_cpu-cpu",
"fields": {
"Percent_DPC_Time": "float",
"Percent_Idle_Time": "float",
"Percent_Interrupt_Time": "float",
"Percent_Privileged_Time": "float",
"Percent_Processor_Time": "float",
"Percent_User_Time": "float"
},
"tags": {
"host": "binary(32)",
"instance": "binary(32)",
"objectname": "binary(32)"
void
tgFreeSchema
(
STgSchema
*
schema
)
{
if
(
schema
->
name
!=
NULL
)
{
free
(
schema
->
name
);
schema
->
name
=
NULL
;
}
if
(
schema
->
tbName
!=
NULL
)
{
free
(
schema
->
tbName
);
schema
->
tbName
=
NULL
;
}
if
(
schema
->
fields
!=
NULL
)
{
for
(
int
f
=
0
;
f
<
schema
->
fieldNum
;
++
f
)
{
if
(
schema
->
fields
[
f
]
!=
NULL
)
{
free
(
schema
->
fields
[
f
]);
schema
->
fields
[
f
]
=
NULL
;
}
}
},
{
"fields": {
"Bytes_Received_persec-f1": "float",
"Bytes_Sent_persec-f2": "float",
"Packets_Outbound_Discarded-f3": "float",
"Packets_Outbound_Errors-f4": "float",
"Packets_Received_Discarded-f5": "float",
"Packets_Received_Errors": "float",
"Packets_Received_persec": "float",
"Packets_Sent_persec": "float"
},
"name": "win_net",
"tags": {
"host": "binary(32)",
"instance": "binary(32)",
"objectname": "binary(32)"
},
"timestamp": 1536219762000
}]
free
(
schema
->
fields
);
schema
->
fields
=
NULL
;
schema
->
fieldNum
=
0
;
}
*/
void
tgReadSchemaMetric
(
cJSON
*
metric
)
{
STgStable
stable
=
{
0
};
int
createSTableStrLen
=
100
;
bool
parsedOk
=
true
;
}
// stable name
void
tgFreeSchemas
()
{
if
(
tgSchemas
.
schemas
!=
NULL
)
{
for
(
int
s
=
0
;
s
<
tgSchemas
.
size
;
++
s
)
{
tgFreeSchema
(
&
tgSchemas
.
schemas
[
s
]);
}
free
(
tgSchemas
.
schemas
);
tgSchemas
.
size
=
0
;
}
}
void
tgInitSchemas
(
int
size
)
{
tgFreeSchemas
();
tgSchemas
.
schemas
=
calloc
(
sizeof
(
STgSchema
),
size
);
tgSchemas
.
size
=
0
;
}
void
tgParseSchemaMetric
(
cJSON
*
metric
)
{
STgSchema
schema
=
{
0
};
bool
parsedOk
=
true
;
// name
cJSON
*
name
=
cJSON_GetObjectItem
(
metric
,
"name"
);
if
(
name
==
NULL
)
{
parsedOk
=
false
;
...
...
@@ -118,331 +147,143 @@ void tgReadSchemaMetric(cJSON *metric) {
parsedOk
=
false
;
goto
ParseEnd
;
}
int
aliasPos
=
-
1
;
for
(
int
i
=
0
;
i
<
nameLen
-
1
;
++
i
)
{
if
(
name
->
valuestring
[
i
]
==
'-'
)
{
aliasPos
=
i
;
break
;
}
schema
.
name
=
calloc
(
nameLen
+
1
,
1
);
strcpy
(
schema
.
name
,
name
->
valuestring
);
// tbname
cJSON
*
tbname
=
cJSON_GetObjectItem
(
metric
,
"tbname"
);
if
(
tbname
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
if
(
aliasPos
==
-
1
)
{
stable
.
stName
=
stable
.
stAlias
=
calloc
((
size_t
)
nameLen
+
1
,
1
);
strcpy
(
stable
.
stName
,
name
->
valuestring
);
createSTableStrLen
+=
nameLen
;
}
else
{
stable
.
stName
=
calloc
((
size_t
)
aliasPos
+
1
,
1
);
stable
.
stAlias
=
calloc
((
size_t
)(
nameLen
-
aliasPos
),
1
);
strncpy
(
stable
.
stName
,
name
->
valuestring
,
(
size_t
)
aliasPos
);
strncpy
(
stable
.
stAlias
,
name
->
valuestring
+
aliasPos
+
1
,
(
size_t
)(
nameLen
-
aliasPos
-
1
));
createSTableStrLen
+=
(
nameLen
-
aliasPos
);
if
(
tbname
->
type
!=
cJSON_String
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
// tags
cJSON
*
tags
=
cJSON_GetObjectItem
(
metric
,
"tags"
);
if
(
tags
==
NULL
)
{
if
(
tbname
->
valuestring
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
t
agsSize
=
cJSON_GetArraySize
(
tags
);
if
(
t
agsSize
<=
0
||
tagsSize
>
TSDB_MAX_TAGS
)
{
int
t
bnameLen
=
(
int
)
strlen
(
tbname
->
valuestring
);
if
(
t
bnameLen
==
0
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
stable
.
tags
=
calloc
(
sizeof
(
STgTag
),
(
size_t
)
tagsSize
);
stable
.
tagNum
=
(
int16_t
)
tagsSize
;
for
(
int
i
=
0
;
i
<
tagsSize
;
i
++
)
{
STgTag
*
tagSchema
=
&
stable
.
tags
[
i
];
cJSON
*
tag
=
cJSON_GetArrayItem
(
tags
,
i
);
if
(
tag
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
if
(
tag
->
string
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
nameLen
=
(
int
)
strlen
(
tag
->
string
);
if
(
nameLen
==
0
||
nameLen
>
TSDB_METER_NAME_LEN
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
aliasPos
=
-
1
;
for
(
int
i
=
0
;
i
<
nameLen
-
1
;
++
i
)
{
if
(
tag
->
string
[
i
]
==
'-'
)
{
aliasPos
=
i
;
break
;
}
}
if
(
aliasPos
==
-
1
)
{
tagSchema
->
tagName
=
calloc
((
size_t
)
nameLen
+
1
,
1
);
strcpy
(
tagSchema
->
tagName
,
tag
->
string
);
tagSchema
->
tagAlias
=
calloc
((
size_t
)
nameLen
+
3
,
1
);
strcpy
(
tagSchema
->
tagAlias
,
"t_"
);
strcpy
(
tagSchema
->
tagAlias
+
2
,
tag
->
string
);
createSTableStrLen
+=
(
nameLen
+
4
);
}
else
{
tagSchema
->
tagName
=
calloc
((
size_t
)
aliasPos
+
1
,
1
);
tagSchema
->
tagAlias
=
calloc
((
size_t
)(
nameLen
-
aliasPos
),
1
);
strncpy
(
tagSchema
->
tagName
,
tag
->
string
,
(
size_t
)
aliasPos
);
strncpy
(
tagSchema
->
tagAlias
,
tag
->
string
+
aliasPos
+
1
,
(
size_t
)(
nameLen
-
aliasPos
-
1
));
createSTableStrLen
+=
(
nameLen
-
aliasPos
+
2
);
}
if
(
tag
->
type
==
cJSON_String
)
{
if
(
tag
->
valuestring
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
valueLen
=
(
int
)
strlen
(
tag
->
valuestring
);
if
(
valueLen
==
0
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
if
(
strcasecmp
(
tag
->
valuestring
,
"timestamp"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"bool"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"tinyint"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"smallint"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"int"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"bigint"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"float"
)
==
0
||
strcasecmp
(
tag
->
valuestring
,
"double"
)
==
0
||
strncasecmp
(
tag
->
valuestring
,
"binary"
,
6
)
==
0
||
strncasecmp
(
tag
->
valuestring
,
"nchar"
,
5
)
==
0
)
{
tagSchema
->
tagType
=
calloc
((
size_t
)
valueLen
+
1
,
1
);
strcpy
(
tagSchema
->
tagType
,
tag
->
valuestring
);
createSTableStrLen
+=
valueLen
;
}
else
{
tagSchema
->
tagType
=
calloc
(
11
,
1
);
strcpy
(
tagSchema
->
tagType
,
"binary(32)"
);
createSTableStrLen
+=
12
;
}
}
else
if
(
tag
->
type
==
cJSON_False
||
tag
->
type
==
cJSON_True
)
{
tagSchema
->
tagType
=
calloc
(
8
,
1
);
strcpy
(
tagSchema
->
tagType
,
"tinyint"
);
createSTableStrLen
+=
10
;
}
else
{
tagSchema
->
tagType
=
calloc
(
7
,
1
);
strcpy
(
tagSchema
->
tagType
,
"bigint"
);
createSTableStrLen
+=
9
;
}
}
schema
.
tbName
=
calloc
(
tbnameLen
+
1
,
1
);
strcpy
(
schema
.
tbName
,
tbname
->
valuestring
);
// fields
// fields
cJSON
*
fields
=
cJSON_GetObjectItem
(
metric
,
"fields"
);
if
(
fields
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
fieldSize
=
cJSON_GetArraySize
(
fields
);
if
(
fieldSize
<=
0
||
fieldSize
>
TSDB_MAX_COLUMNS
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
stable
.
fields
=
calloc
(
sizeof
(
STgField
),
(
size_t
)
fieldSize
);
stable
.
fieldNum
=
(
int16_t
)
fieldSize
;
for
(
int
i
=
0
;
i
<
fieldSize
;
i
++
)
{
STgField
*
fieldSchema
=
&
stable
.
fields
[
i
];
cJSON
*
field
=
cJSON_GetArrayItem
(
fields
,
i
);
if
(
field
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
if
(
field
->
string
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
nameLen
=
(
int
)
strlen
(
field
->
string
);
if
(
nameLen
==
0
||
nameLen
>
TSDB_METER_NAME_LEN
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
aliasPos
=
-
1
;
for
(
int
i
=
0
;
i
<
nameLen
-
1
;
++
i
)
{
if
(
field
->
string
[
i
]
==
'-'
)
{
aliasPos
=
i
;
break
;
}
}
if
(
aliasPos
==
-
1
)
{
fieldSchema
->
fieldName
=
calloc
((
size_t
)
nameLen
+
1
,
1
);
strcpy
(
fieldSchema
->
fieldName
,
field
->
string
);
fieldSchema
->
fieldAlias
=
calloc
((
size_t
)
nameLen
+
3
,
1
);
strcpy
(
fieldSchema
->
fieldAlias
,
"f_"
);
strcpy
(
fieldSchema
->
fieldAlias
+
2
,
field
->
string
);
createSTableStrLen
+=
(
nameLen
+
4
);
}
else
{
fieldSchema
->
fieldName
=
calloc
((
size_t
)
aliasPos
+
1
,
1
);
fieldSchema
->
fieldAlias
=
calloc
((
size_t
)(
nameLen
-
aliasPos
),
1
);
strncpy
(
fieldSchema
->
fieldName
,
field
->
string
,
(
size_t
)
aliasPos
);
strncpy
(
fieldSchema
->
fieldAlias
,
field
->
string
+
aliasPos
+
1
,
(
size_t
)(
nameLen
-
aliasPos
-
1
));
createSTableStrLen
+=
(
nameLen
-
aliasPos
+
2
);
}
if
(
field
->
type
==
cJSON_String
)
{
if
(
field
->
valuestring
==
NULL
)
{
if
(
fieldSize
>
0
)
{
schema
.
fields
=
calloc
(
sizeof
(
STgSchema
),
(
size_t
)
fieldSize
);
schema
.
fieldNum
=
fieldSize
;
for
(
int
i
=
0
;
i
<
fieldSize
;
i
++
)
{
cJSON
*
field
=
cJSON_GetArrayItem
(
fields
,
i
);
if
(
field
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
int
valueLen
=
(
int
)
strlen
(
field
->
valuestring
);
if
(
valueLen
==
0
)
{
if
(
field
->
valuestring
==
NULL
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
if
(
strcasecmp
(
field
->
valuestring
,
"timestamp"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"bool"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"tinyint"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"smallint"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"int"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"bigint"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"float"
)
==
0
||
strcasecmp
(
field
->
valuestring
,
"double"
)
==
0
||
strncasecmp
(
field
->
valuestring
,
"binary"
,
6
)
==
0
||
strncasecmp
(
field
->
valuestring
,
"nchar"
,
5
)
==
0
)
{
fieldSchema
->
fieldType
=
calloc
((
size_t
)
valueLen
+
1
,
1
);
strcpy
(
fieldSchema
->
fieldType
,
field
->
valuestring
);
createSTableStrLen
+=
valueLen
;
}
else
{
fieldSchema
->
fieldType
=
calloc
(
11
,
1
);
strcpy
(
fieldSchema
->
fieldType
,
"binary(32)"
);
createSTableStrLen
+=
12
;
int
nameLen
=
(
int
)
strlen
(
field
->
valuestring
);
if
(
nameLen
==
0
||
nameLen
>
TSDB_METER_NAME_LEN
)
{
parsedOk
=
false
;
goto
ParseEnd
;
}
}
else
if
(
field
->
type
==
cJSON_False
||
field
->
type
==
cJSON_True
)
{
fieldSchema
->
fieldType
=
calloc
(
8
,
1
);
strcpy
(
fieldSchema
->
fieldType
,
"tinyint"
);
createSTableStrLen
+=
10
;
}
else
{
fieldSchema
->
fieldType
=
calloc
(
7
,
1
);
strcpy
(
fieldSchema
->
fieldType
,
"double"
);
createSTableStrLen
+=
9
;
schema
.
fields
[
i
]
=
calloc
(
nameLen
+
1
,
1
);
strcpy
(
schema
.
fields
[
i
],
field
->
valuestring
);
}
}
// assembling create stable sql
stable
.
createSTableStr
=
calloc
((
size_t
)
createSTableStrLen
,
1
);
strcpy
(
stable
.
createSTableStr
,
"create table if not exists %s.%s(ts timestamp"
);
int
len
=
(
int
)
strlen
(
stable
.
createSTableStr
);
for
(
int
i
=
0
;
i
<
stable
.
fieldNum
;
++
i
)
{
STgField
*
field
=
&
stable
.
fields
[
i
];
len
+=
sprintf
(
stable
.
createSTableStr
+
len
,
",%s %s"
,
field
->
fieldAlias
,
field
->
fieldType
);
}
len
+=
sprintf
(
stable
.
createSTableStr
+
len
,
") tags("
);
for
(
int
i
=
0
;
i
<
stable
.
tagNum
;
++
i
)
{
STgTag
*
tag
=
&
stable
.
tags
[
i
];
if
(
i
==
0
)
{
len
+=
sprintf
(
stable
.
createSTableStr
+
len
,
"%s %s"
,
tag
->
tagAlias
,
tag
->
tagType
);
}
else
{
len
+=
sprintf
(
stable
.
createSTableStr
+
len
,
",%s %s"
,
tag
->
tagAlias
,
tag
->
tagType
);
}
}
sprintf
(
stable
.
createSTableStr
+
len
,
")"
);
ParseEnd:
if
(
parsedOk
)
{
t
aosAddStrHash
(
tgSchemaHash
,
stable
.
stName
,
(
char
*
)(
&
stable
))
;
t
gSchemas
.
schemas
[
tgSchemas
.
size
++
]
=
schema
;
}
else
{
if
(
stable
.
stName
!=
NULL
)
{
free
(
stable
.
stName
);
}
if
(
stable
.
stAlias
!=
NULL
)
{
free
(
stable
.
stName
);
}
for
(
int
i
=
0
;
i
<
stable
.
tagNum
;
++
i
)
{
if
(
stable
.
tags
[
i
].
tagName
!=
NULL
)
{
free
(
stable
.
tags
[
i
].
tagName
);
}
if
(
stable
.
tags
[
i
].
tagAlias
!=
NULL
)
{
free
(
stable
.
tags
[
i
].
tagAlias
);
}
if
(
stable
.
tags
[
i
].
tagType
!=
NULL
)
{
free
(
stable
.
tags
[
i
].
tagType
);
}
}
if
(
stable
.
tags
!=
NULL
)
{
free
(
stable
.
tags
);
}
for
(
int
i
=
0
;
i
<
stable
.
fieldNum
;
++
i
)
{
if
(
stable
.
fields
[
i
].
fieldName
!=
NULL
)
{
free
(
stable
.
fields
[
i
].
fieldName
);
}
if
(
stable
.
fields
[
i
].
fieldAlias
!=
NULL
)
{
free
(
stable
.
fields
[
i
].
fieldAlias
);
}
if
(
stable
.
fields
[
i
].
fieldType
!=
NULL
)
{
free
(
stable
.
fields
[
i
].
fieldType
);
}
}
if
(
stable
.
fields
!=
NULL
)
{
free
(
stable
.
fields
);
}
if
(
stable
.
createSTableStr
!=
NULL
)
{
free
(
stable
.
createSTableStr
);
}
tgFreeSchema
(
&
schema
);
}
}
int
tgReadSchema
(
const
char
*
fileName
)
{
FILE
*
fp
=
fopen
(
fileName
,
"r"
);
if
(
fp
==
NULL
)
{
//httpTrace("failed to open telegraf schema config file:%s, use default schema", fileName);
return
-
1
;
}
httpPrint
(
"open telegraf schema config file:%s successfully"
,
fileName
);
fseek
(
fp
,
0
,
SEEK_END
);
size_t
contentSize
=
(
size_t
)
ftell
(
fp
);
rewind
(
fp
);
char
*
content
=
(
char
*
)
calloc
(
contentSize
*
sizeof
(
char
)
+
1
,
1
);
size_t
result
=
fread
(
content
,
1
,
contentSize
,
fp
);
if
(
result
!=
contentSize
)
{
httpError
(
"failed to read telegraf schema config file:%s, use default schema"
,
fileName
);
return
-
1
;
}
int
tgParseSchema
(
const
char
*
content
,
char
*
fileName
)
{
cJSON
*
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
httpError
(
"failed to parse telegraf schema
config file:%s, invalid json format"
,
fileName
);
httpError
(
"failed to parse telegraf schema
file:%s, invalid json format, content:%s"
,
fileName
,
content
);
return
-
1
;
}
int
size
=
0
;
cJSON
*
metrics
=
cJSON_GetObjectItem
(
root
,
"metrics"
);
if
(
metrics
!=
NULL
)
{
int
size
=
cJSON_GetArraySize
(
metrics
);
size
=
cJSON_GetArraySize
(
metrics
);
if
(
size
<=
0
)
{
httpError
(
"failed to parse telegraf schema
config
file:%s, metrics size is 0"
,
fileName
);
httpError
(
"failed to parse telegraf schema file:%s, metrics size is 0"
,
fileName
);
cJSON_Delete
(
root
);
return
-
1
;
}
tgInitSchemas
(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++
)
{
cJSON
*
metric
=
cJSON_GetArrayItem
(
metrics
,
i
);
if
(
metric
!=
NULL
)
{
tg
Read
SchemaMetric
(
metric
);
tg
Parse
SchemaMetric
(
metric
);
}
}
}
else
{
tgReadSchemaMetric
(
root
);
size
=
1
;
tgInitSchemas
(
size
);
tgParseSchemaMetric
(
root
);
}
cJSON_Delete
(
root
);
return
size
;
}
int
tgReadSchema
(
const
char
*
fileName
)
{
FILE
*
fp
=
fopen
(
fileName
,
"r"
);
if
(
fp
==
NULL
)
{
return
-
1
;
}
httpPrint
(
"open telegraf schema file:%s success"
,
fileName
);
fseek
(
fp
,
0
,
SEEK_END
);
size_t
contentSize
=
(
size_t
)
ftell
(
fp
);
rewind
(
fp
);
char
*
content
=
(
char
*
)
calloc
(
contentSize
*
sizeof
(
char
)
+
1
,
1
);
size_t
result
=
fread
(
content
,
1
,
contentSize
,
fp
);
if
(
result
!=
contentSize
)
{
httpError
(
"failed to read telegraf schema file:%s"
,
fileName
);
return
-
1
;
}
int
schemaNum
=
tgParseSchema
(
content
,
fileName
);
free
(
content
);
fclose
(
fp
);
httpPrint
(
"parse telegraf schema file:%s, schema size:%d"
,
fileName
,
schemaNum
);
httpPrint
(
"parse telegraf schema config file:%s successfully, stable schema size:%d"
,
fileName
);
return
0
;
return
schemaNum
;
}
/*
* in case of file not exist
* we use default schema:
* such as:
* diskio
* mem
* processes
* procstat
* system
* disk
* swap
* kernel
*/
void
tgInitHandle
(
HttpServer
*
pServer
)
{
tgSchemaHash
=
taosInitStrHash
(
100
,
sizeof
(
STgStable
),
taosHashStringStep1
);
char
fileName
[
256
]
=
{
0
};
sprintf
(
fileName
,
"%s/taos.telegraf.cfg"
,
configDir
);
if
(
tgReadSchema
(
fileName
)
==
-
1
)
{
taosCleanUpStrHash
(
tgSchemaHash
);
tgSchemaHash
=
NULL
;
if
(
tgReadSchema
(
fileName
)
<=
0
)
{
tgFreeSchemas
();
if
(
tgParseSchema
(
DEFAULT_TELEGRAF_CFG
,
"default"
)
<=
0
)
{
tgFreeSchemas
();
}
}
httpAddMethod
(
pServer
,
&
tgDecodeMethod
);
}
...
...
@@ -481,23 +322,57 @@ char *tgGetDbFromUrl(HttpContext *pContext) {
return
pParser
->
path
[
TG_DB_URL_POS
].
pos
;
}
char
*
tgGetStableName
(
char
*
stname
,
cJSON
*
fields
,
int
fieldsSize
)
{
for
(
int
s
=
0
;
s
<
tgSchemas
.
size
;
++
s
)
{
STgSchema
*
schema
=
&
tgSchemas
.
schemas
[
s
];
if
(
strcasecmp
(
schema
->
name
,
stname
)
!=
0
)
{
continue
;
}
bool
schemaMatched
=
true
;
for
(
int
f
=
0
;
f
<
schema
->
fieldNum
;
++
f
)
{
char
*
fieldName
=
schema
->
fields
[
f
];
bool
fieldMatched
=
false
;
for
(
int
i
=
0
;
i
<
fieldsSize
;
i
++
)
{
cJSON
*
field
=
cJSON_GetArrayItem
(
fields
,
i
);
if
(
strcasecmp
(
field
->
string
,
fieldName
)
==
0
)
{
fieldMatched
=
true
;
break
;
}
}
if
(
!
fieldMatched
)
{
schemaMatched
=
false
;
break
;
}
}
if
(
schemaMatched
)
{
return
schema
->
tbName
;
}
}
return
stname
;
}
/*
* parse single metric
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
*/
bool
tgProcessSingleMetric
UseDefaultSchema
(
HttpContext
*
pContext
,
cJSON
*
metric
,
char
*
db
)
{
bool
tgProcessSingleMetric
(
HttpContext
*
pContext
,
cJSON
*
metric
,
char
*
db
)
{
// metric name
cJSON
*
name
=
cJSON_GetObjectItem
(
metric
,
"name"
);
if
(
name
==
NULL
)
{
...
...
@@ -698,7 +573,7 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric,
table_cmd
->
timestamp
=
stable_cmd
->
timestamp
=
httpAddToSqlCmdBuffer
(
pContext
,
"%ld"
,
timestamp
->
valueint
);
// stable name
char
*
stname
=
name
->
valuestring
;
char
*
stname
=
tgGetStableName
(
name
->
valuestring
,
fields
,
fieldsSize
)
;
table_cmd
->
metric
=
stable_cmd
->
metric
=
httpAddToSqlCmdBuffer
(
pContext
,
"%s"
,
stname
);
if
(
tsTelegrafUseFieldNum
==
0
)
{
table_cmd
->
stable
=
stable_cmd
->
stable
=
httpAddToSqlCmdBuffer
(
pContext
,
"%s"
,
stname
);
...
...
@@ -851,267 +726,6 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric,
return
true
;
}
bool
tgProcessSingleMetricUseConfigSchema
(
HttpContext
*
pContext
,
cJSON
*
metric
,
char
*
db
)
{
// metric name
cJSON
*
name
=
cJSON_GetObjectItem
(
metric
,
"name"
);
if
(
name
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_METRIC_NULL
);
return
false
;
}
if
(
name
->
type
!=
cJSON_String
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_METRIC_TYPE
);
return
false
;
}
if
(
name
->
valuestring
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_METRIC_NAME_NULL
);
return
false
;
}
int
nameLen
=
(
int
)
strlen
(
name
->
valuestring
);
if
(
nameLen
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_METRIC_NAME_NULL
);
return
false
;
}
STgStable
*
stable
=
(
STgStable
*
)
taosGetStrHashData
(
tgSchemaHash
,
name
->
valuestring
);
if
(
stable
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_STABLE_NOT_EXIST
);
return
false
;
}
// timestamp
cJSON
*
timestamp
=
cJSON_GetObjectItem
(
metric
,
"timestamp"
);
if
(
timestamp
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TIMESTAMP_NULL
);
return
false
;
}
if
(
timestamp
->
type
!=
cJSON_Number
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TIMESTAMP_TYPE
);
return
false
;
}
if
(
timestamp
->
valueint
<=
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TIMESTAMP_VAL_NULL
);
return
false
;
}
// tags
cJSON
*
tags
=
cJSON_GetObjectItem
(
metric
,
"tags"
);
if
(
tags
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAGS_NULL
);
return
false
;
}
int
tagsSize
=
cJSON_GetArraySize
(
tags
);
if
(
tagsSize
<=
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAGS_SIZE_0
);
return
false
;
}
for
(
int
i
=
0
;
i
<
tagsSize
;
i
++
)
{
cJSON
*
tag
=
cJSON_GetArrayItem
(
tags
,
i
);
if
(
tag
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAG_NULL
);
return
false
;
}
if
(
tag
->
string
==
NULL
||
strlen
(
tag
->
string
)
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAG_NAME_NULL
);
return
false
;
}
if
(
tag
->
type
!=
cJSON_Number
&&
tag
->
type
!=
cJSON_String
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAG_VALUE_TYPE
);
return
false
;
}
if
(
tag
->
type
==
cJSON_String
)
{
if
(
tag
->
valuestring
==
NULL
||
strlen
(
tag
->
valuestring
)
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_TAG_VALUE_NULL
);
return
false
;
}
}
}
// fields
cJSON
*
fields
=
cJSON_GetObjectItem
(
metric
,
"fields"
);
if
(
fields
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELDS_NULL
);
return
false
;
}
int
fieldsSize
=
cJSON_GetArraySize
(
fields
);
if
(
fieldsSize
<=
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELDS_SIZE_0
);
return
false
;
}
for
(
int
i
=
0
;
i
<
fieldsSize
;
i
++
)
{
cJSON
*
field
=
cJSON_GetArrayItem
(
fields
,
i
);
if
(
field
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELD_NULL
);
return
false
;
}
if
(
field
->
string
==
NULL
||
strlen
(
field
->
string
)
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELD_NAME_NULL
);
return
false
;
}
if
(
field
->
type
!=
cJSON_Number
&&
field
->
type
!=
cJSON_String
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELD_VALUE_TYPE
);
return
false
;
}
if
(
field
->
type
==
cJSON_String
)
{
if
(
field
->
valuestring
==
NULL
||
strlen
(
field
->
valuestring
)
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_FIELD_VALUE_NULL
);
return
false
;
}
}
}
// assembling cmds
HttpSqlCmd
*
stable_cmd
=
httpNewSqlCmd
(
pContext
);
if
(
stable_cmd
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_NO_ENOUGH_MEMORY
);
return
false
;
}
stable_cmd
->
cmdType
=
HTTP_CMD_TYPE_CREATE_STBALE
;
stable_cmd
->
cmdReturnType
=
HTTP_CMD_RETURN_TYPE_NO_RETURN
;
HttpSqlCmd
*
table_cmd
=
httpNewSqlCmd
(
pContext
);
if
(
table_cmd
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_NO_ENOUGH_MEMORY
);
return
false
;
}
table_cmd
->
cmdType
=
HTTP_CMD_TYPE_INSERT
;
table_cmd
->
tagNum
=
stable_cmd
->
tagNum
=
(
int8_t
)
stable
->
tagNum
;
table_cmd
->
timestamp
=
stable_cmd
->
timestamp
=
httpAddToSqlCmdBuffer
(
pContext
,
"%ld"
,
timestamp
->
valueint
);
// stable name
char
*
stname
=
stable
->
stAlias
;
table_cmd
->
metric
=
stable_cmd
->
metric
=
httpAddToSqlCmdBuffer
(
pContext
,
"%s"
,
stname
);
table_cmd
->
stable
=
stable_cmd
->
stable
=
httpAddToSqlCmdBuffer
(
pContext
,
"%s"
,
stname
);
table_cmd
->
stable
=
stable_cmd
->
stable
=
httpShrinkTableName
(
pContext
,
table_cmd
->
stable
,
httpGetCmdsString
(
pContext
,
table_cmd
->
stable
));
// stable tag for detail
for
(
int
ts
=
0
;
ts
<
stable
->
tagNum
;
++
ts
)
{
STgTag
*
tagSchema
=
&
stable
->
tags
[
ts
];
bool
tagParsed
=
false
;
for
(
int
tt
=
0
;
tt
<
tagsSize
;
++
tt
)
{
cJSON
*
tag
=
cJSON_GetArrayItem
(
tags
,
tt
);
if
(
strcasecmp
(
tag
->
string
,
tagSchema
->
tagName
)
!=
0
)
{
continue
;
}
stable_cmd
->
tagNames
[
ts
]
=
table_cmd
->
tagNames
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
tagSchema
->
tagAlias
);
if
(
tag
->
type
==
cJSON_String
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
"'%s'"
,
tag
->
valuestring
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_Number
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
"%ld"
,
tag
->
valueint
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_True
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
"1"
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_False
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
"0"
);
tagParsed
=
true
;
}
else
{
}
break
;
}
if
(
!
tagParsed
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBuffer
(
pContext
,
"NULL"
);
}
}
// table name
table_cmd
->
table
=
stable_cmd
->
table
=
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"%s"
,
stname
);
for
(
int
ts
=
0
;
ts
<
stable
->
tagNum
;
++
ts
)
{
STgTag
*
tagSchema
=
&
stable
->
tags
[
ts
];
bool
tagParsed
=
false
;
for
(
int
tt
=
0
;
tt
<
tagsSize
;
++
tt
)
{
cJSON
*
tag
=
cJSON_GetArrayItem
(
tags
,
tt
);
if
(
strcasecmp
(
tag
->
string
,
tagSchema
->
tagName
)
!=
0
)
{
continue
;
}
if
(
tag
->
type
==
cJSON_String
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"_%s"
,
tag
->
valuestring
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_Number
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"_%ld"
,
tag
->
valueint
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_True
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"_1"
);
tagParsed
=
true
;
}
else
if
(
tag
->
type
==
cJSON_False
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"_0"
);
tagParsed
=
true
;
}
else
{
}
break
;
}
if
(
!
tagParsed
)
{
stable_cmd
->
tagValues
[
ts
]
=
table_cmd
->
tagValues
[
ts
]
=
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"_n"
);
}
}
httpAddToSqlCmdBuffer
(
pContext
,
""
);
table_cmd
->
table
=
stable_cmd
->
table
=
httpShrinkTableName
(
pContext
,
table_cmd
->
table
,
httpGetCmdsString
(
pContext
,
table_cmd
->
table
));
// assembling create stable sql
stable_cmd
->
sql
=
httpAddToSqlCmdBuffer
(
pContext
,
stable
->
createSTableStr
,
db
,
httpGetCmdsString
(
pContext
,
table_cmd
->
stable
));
// assembling insert sql
table_cmd
->
sql
=
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"import into %s.%s using %s.%s tags("
,
db
,
httpGetCmdsString
(
pContext
,
table_cmd
->
table
),
db
,
httpGetCmdsString
(
pContext
,
table_cmd
->
stable
));
for
(
int
ts
=
0
;
ts
<
stable
->
tagNum
;
++
ts
)
{
if
(
ts
!=
0
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",%s"
,
httpGetCmdsString
(
pContext
,
stable_cmd
->
tagValues
[
ts
]));
}
else
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
"%s"
,
httpGetCmdsString
(
pContext
,
stable_cmd
->
tagValues
[
ts
]));
}
}
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
") values(%ld"
,
timestamp
->
valueint
);
// stable tag for detail
for
(
int
fs
=
0
;
fs
<
stable
->
fieldNum
;
++
fs
)
{
STgField
*
fieldSchema
=
&
stable
->
fields
[
fs
];
bool
fieldParsed
=
false
;
for
(
int
ff
=
0
;
ff
<
fieldsSize
;
++
ff
)
{
cJSON
*
field
=
cJSON_GetArrayItem
(
fields
,
ff
);
if
(
strcasecmp
(
field
->
string
,
fieldSchema
->
fieldName
)
!=
0
)
{
continue
;
}
if
(
field
->
type
==
cJSON_String
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",
\"
%s
\"
"
,
field
->
valuestring
);
fieldParsed
=
true
;
}
else
if
(
field
->
type
==
cJSON_Number
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",%lf"
,
field
->
valuedouble
);
fieldParsed
=
true
;
}
else
if
(
field
->
type
==
cJSON_True
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",1"
);
fieldParsed
=
true
;
}
else
if
(
field
->
type
==
cJSON_False
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",0"
);
fieldParsed
=
true
;
}
else
{
}
break
;
}
if
(
!
fieldParsed
)
{
httpAddToSqlCmdBufferNoTerminal
(
pContext
,
",NULL"
);
}
}
httpAddToSqlCmdBuffer
(
pContext
,
")"
);
return
true
;
}
/**
* request from telegraf 1.7.0
* single request:
...
...
@@ -1213,16 +827,9 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
for
(
int
i
=
0
;
i
<
size
;
i
++
)
{
cJSON
*
metric
=
cJSON_GetArrayItem
(
metrics
,
i
);
if
(
metric
!=
NULL
)
{
if
(
tgSchemaHash
!=
NULL
)
{
if
(
!
tgProcessSingleMetricUseConfigSchema
(
pContext
,
metric
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
}
else
{
if
(
!
tgProcessSingleMetricUseDefaultSchema
(
pContext
,
metric
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
if
(
!
tgProcessSingleMetric
(
pContext
,
metric
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
}
}
...
...
@@ -1245,16 +852,9 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
cmd
->
cmdReturnType
=
HTTP_CMD_RETURN_TYPE_NO_RETURN
;
cmd
->
sql
=
httpAddToSqlCmdBuffer
(
pContext
,
"create database if not exists %s"
,
db
);
if
(
tgSchemaHash
!=
NULL
)
{
if
(
!
tgProcessSingleMetricUseConfigSchema
(
pContext
,
root
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
}
else
{
if
(
!
tgProcessSingleMetricUseDefaultSchema
(
pContext
,
root
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
if
(
!
tgProcessSingleMetric
(
pContext
,
root
,
db
))
{
cJSON_Delete
(
root
);
return
false
;
}
}
...
...
@@ -1268,9 +868,6 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
}
bool
tgProcessRquest
(
struct
HttpContext
*
pContext
)
{
tgGetUserFromUrl
(
pContext
);
tgGetPassFromUrl
(
pContext
);
if
(
strlen
(
pContext
->
user
)
==
0
||
strlen
(
pContext
->
pass
)
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_PARSE_USR_ERROR
);
return
false
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录