提交 b3d33993 编写于 作者: L liu0x54

[TD-212] add bailongma code

上级 c663f351
## Builder image
FROM tdengine/tdengine:dev as builder1
FROM golang:latest
WORKDIR /root
#COPY --from=builder1 /usr/include/taos.h /usr/include/
#COPY --from=builder1 /usr/lib/libtaos.so /usr/lib/libtaos.so
COPY --from=builder1 /usr/include/taos.h /usr/include/
COPY --from=builder1 /usr/lib/libtaos.so.1 /usr/lib/
RUN mkdir /usr/lib/ld
RUN ln -s /usr/lib/libtaos.so.1 /usr/lib/libtaos.so
RUN git config --global http.sslVerify false
RUN git config --global http.postbuffer 524288000
RUN go get -v -u -insecure github.com/taosdata/TDengine/src/connector/go/src/taosSql
RUN go get -v -u -insecure github.com/gogo/protobuf/proto
RUN go get -v -u -insecure github.com/golang/snappy
RUN go get -v -u -insecure github.com/prometheus/common/model
RUN go get -v -u -insecure github.com/prometheus/prometheus/prompb
RUN go get github.com/taosdata/driver-go/taosSql
## Builder image
FROM tdengine/tdengine:dev as builder1
FROM tdengine/bailongma:dev as builder
COPY server.go /root/blm_prometheus/
WORKDIR /root/blm_prometheus
# build enterprise version
RUN go build
# # build community version
# RUN cmake .. -DVERSION=lite && cmake --build .
## Target image
FROM centos:7
WORKDIR /root/
# COPY --from=builder /root/build/build/lib/libtaos.so /usr/lib/libtaos.so.1
# RUN ln -s /usr/lib/libtaos.so.1 /usr/lib/libtaos.so
COPY --from=builder1 /usr/include/taos.h /usr/include/
COPY --from=builder1 /usr/lib/libtaos.so.1 /usr/lib/libtaos.so.1
RUN ln -s /usr/lib/libtaos.so.1 /usr/lib/libtaos.so
COPY --from=builder /root/blm_prometheus/blm_prometheus .
#COPY community/packaging/cfg/taos.cfg /etc/taos/
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib"
ENV LANG=en_US.UTF-8
ENV LANGUAGE=en_US:en
ENV LC_ALL=en_US.UTF-8
EXPOSE 10203
VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos" ]
ENTRYPOINT [ "/root/blm_prometheus" ]
\ No newline at end of file
# Prometheus Remote Write Adapter for TDengine
This is an adapter to support Prometheus remote write into TDengine.
## Prerequisite
before running the software, you need to install the `golang-1.10` or later version in your environment and install [TDengine][] so the program can use the lib of TDengine.
To use it:
```
go build
```
During the go build process, there maybe some errors arised because of lacking some needed packages. You can use `go get` the package to solve it
```
go get github.com/gogo/protobuf/proto
go get github.com/golang/snappy
go get github.com/prometheus/common/model
go get github.com/taosdata/TDengine/src/connector/go/src/taosSql
go get github.com/prometheus/prometheus/prompb
```
After successful build, there will be a blm_prometheus in the same directory.
## Running in background
Using following command to run the program in background
```
nohup ./blm_prometheus --tdengine-ip 112.102.3.69 --batch-size 80 --http-workers 2 --sql-workers 2 --dbname prometheus --port 1234 > /dev/null 2>&1 &
```
There are several options can be set:
```sh
--tdengine-ip
set the IP of TDengine for example "192.168.0.1"
--tdengine-name
set the domain name of TDengine, then blm-prometheus can lookup the ip address of TDengine.
--tdengine-api-port
set the restful API port of TDengine. blm-prometheus will query the table schema info from TDengine to keep meta info synchronized. Default is 6020
--batch-size
set the size of how many records in one SQL cmd line writing into TDengine. There is a limitation that TDengine could only accept SQL line small than 64000 bytes, so usually the batch size should not exceed 200. Default is 100.
--http-workers
set the number of workers who process the HTTP request. default is 10
--sql-workers
set the number of workers who process the database request. default is 10
--dbname
set the database name in TDengine, if not exists, a database will be created after this dbname. default is "prometheus".
--dbuser
set the user name that have the right to access the TDengine. default is "root"
--dbpassword
set the password of dbuser. default is "taosdata"
--port
set the port that prometheus configuration remote_write. as showed above, in the prometheus.yaml. default is 10203
```
## Start prometheus
Add the following to your prometheus's configuration `prometheus.yml` :
```yaml
remote_write:
- url: "http://localhost:1234/receive"
```
Then start Prometheus:
```
prometheus
```
Then you can check the TDengine if there is super table and tables.
## Check the TDengine tables and datas
Use the taos client shell to query the result.
```
Welcome to the TDengine shell from linux, client version:1.6.4.0 server version:1.6.4.0
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
This is the trial version and will expire at 2019-12-11 14:25:31.
taos> use prometheus;
Database changed.
taos> show stables;
name | created_time |columns| tags | tables |
====================================================================================================================
prometheus_sd_kubernetes_cache_watch_events_sum | 19-11-15 17:45:07.594| 2| 3| 1|
prometheus_sd_kubernetes_cache_watch_events_count | 19-11-15 17:45:07.596| 2| 3| 1|
prometheus_sd_kubernetes_cache_watches_total | 19-11-15 17:45:07.598| 2| 3| 1|
prometheus_sd_kubernetes_events_total | 19-11-15 17:45:07.600| 2| 5| 15|
prometheus_target_scrape_pool_reloads_total | 19-11-15 17:45:07.672| 2| 3| 1|
prometheus_sd_received_updates_total | 19-11-15 17:45:07.674| 2| 4| 1|
prometheus_target_scrape_pool_reloads_failed_total | 19-11-15 17:45:07.730| 2| 3| 1|
prometheus_sd_updates_total | 19-11-15 17:45:07.732| 2| 4| 1|
prometheus_target_scrape_pool_sync_total | 19-11-15 17:45:07.734| 2| 4| 1|
......
go_memstats_gc_cpu_fraction | 19-11-15 17:45:06.599| 2| 3| 1|
Query OK, 211 row(s) in set (0.004891s)
taos> select * from prometheus_sd_updates_total;
......
19-11-16 14:24:00.271| 1.000000000|localhost:9090 |prometheus |codelab-monitor |scrape |
19-11-16 14:24:05.271| 1.000000000|localhost:9090 |prometheus |codelab-monitor |scrape |
Query OK, 3029 row(s) in set (0.060828s)
```
## Support Kubernates liveness probe
The blm_prometheus support the liveness probe.
When the service is running, GET the url`http://ip:port/health` will return 200 OK response which means the service is running healthy. If no response, means the service is dead and need to restart it.
## Limitations
The TDengine limits the length of super table name, so if the name of prometheus metric exceeds 60 byte, it will be truncated to first 60 bytes. And the length of label name is limited within 50 byte.
[TDengine]:https://www.github.com/Taosdata/TDengine
\ No newline at end of file
此差异已折叠。
## Builder image
FROM tdengine/tdengine:dev as builder1
FROM tdengine/bailongma:dev as builder
COPY server.go /root/blm_telegraf/
WORKDIR /root/blm_telegraf
RUN go build
# # build community version
# RUN cmake .. -DVERSION=lite && cmake --build .
## Target image
FROM centos:7
WORKDIR /root
# COPY --from=builder /root/build/build/lib/libtaos.so /usr/lib/libtaos.so.1
# RUN ln -s /usr/lib/libtaos.so.1 /usr/lib/libtaos.so
COPY --from=builder1 /usr/include/taos.h /usr/include/
COPY --from=builder1 /usr/lib/libtaos.so /usr/lib/libtaos.so
RUN ln -s /usr/lib/libtaos.so /usr/lib/libtaos.so.1
COPY --from=builder /root/blm_telegraf/blm_telegraf .
#COPY community/packaging/cfg/taos.cfg /etc/taos/
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib"
ENV LANG=en_US.UTF-8
ENV LANGUAGE=en_US:en
ENV LC_ALL=en_US.UTF-8
EXPOSE 10202
VOLUME [ "/var/lib/taos", "/var/log/taos" ]
ENTRYPOINT [ "/root/blm_telegraf","--host" ]
\ No newline at end of file
# API for Telegraf
This is an API to support Telegraf writing data into TDengine.
## prerequisite
before running the software, you need to install the `golang-1.10` or later version in your environment and install [TDengine][] so the program can use the lib of TDengine.
To use it:
```
go build
```
During the go build process, there maybe some errors arised because of lacking some needed packages. You can use `go get` the package to solve it
```
go get github.com/taosdata/TDengine/src/connector/go/src/taosSql
```
After successful build, there will be a blm_telegraf in the same directory.
## Running in background
Using following command to run the program in background
```
nohup ./blm_telegraf --host 112.102.3.69:0 --batch-size 200 --http-workers 2 --sql-workers 2 --dbname telegraf --port 1234 > /dev/null 2>&1 &
```
The API url is `http://ipaddress:port/telegraf`
There are several options can be set:
```
--host
set the host of TDengine, IP:port, for example "192.168.0.1:0"
--batch-size
set the size of how many records in one SQL cmd line writing into TDengine. There is a limitation that TDengine could only accept SQL line small than 64000 bytes, so usually the batch size should not exceed 800. Default is 10.
--http-workers
set the number of workers who process the HTTP request. default is 10
--sql-workers
set the number of workers who process the database request. default is 10
--dbname
set the database name in TDengine, if not exists, a database will be created after this dbname. default is "telegraf".
--dbuser
set the user name that have the right to access the TDengine. default is "root"
--dbpassword
set the password of dbuser. default is "taosdata"
--port
set the port that prometheus configuration remote_write. as showed above, in the prometheus.yaml
```
## Configure the Telegraf
To write into blm_telegraf API, you should configure the telegraf as below
In the telegraf configuration file, output plugin part:
1. telegraf out put plugin setup:
Set the url to the blm_telegraf API
Set the data format as "json"
Set the json timstamp units as "1ms"
```toml
[[outputs.http]]
# ## URL is the address to send metrics to
url = "http://114.116.124.178:8081/telegraf"
data_format = "json"
json_timestamp_units = "1ms"
```
In the Agent part, the hostname should be unique among all the telegraf which report to the TDengine.
```toml
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "5s"
...
## Override default hostname, if empty use os.Hostname()
hostname = "testhost1"
```
Then start telegraf:
```sh
telegraf --config xxx.conf
```
Then you can check the TDengine if there is super table and tables.
## Check the TDengine tables and datas
Use the taos client shell to query the result.
```
Welcome to the TDengine shell from linux, client version:1.6.4.0 server version:1.6.4.0
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
This is the trial version and will expire at 2019-12-11 14:25:31.
taos> use prometheus;
Database changed.
taos> show stables;
name | created_time |columns| tags | tables |
====================================================================================================================
system | 19-11-22 21:48:10.205| 2| 3| 12|
system_str | 19-11-22 21:48:10.205| 2| 3| 2|
cpu | 19-11-22 21:48:10.225| 2| 4| 200|
cpu_str | 19-11-22 21:48:10.226| 2| 4| 0|
processes | 19-11-22 21:48:10.230| 2| 3| 16|
processes_str | 19-11-22 21:48:10.230| 2| 3| 0|
disk | 19-11-22 21:48:10.233| 2| 7| 357|
disk_str | 19-11-22 21:48:10.234| 2| 7| 0|
diskio | 19-11-22 21:48:10.247| 2| 4| 72|
diskio_str | 19-11-22 21:48:10.248| 2| 4| 0|
swap | 19-11-22 21:48:10.254| 2| 3| 7|
swap_str | 19-11-22 21:48:10.255| 2| 3| 0|
mem | 19-11-22 21:48:10.272| 2| 3| 61|
mem_str | 19-11-22 21:48:10.272| 2| 3| 0|
Query OK, 14 row(s) in set (0.000733s)
taos> select * from mem;
......
19-11-23 14:19:11.000| 0.000000000|testhost1 |1.202.240.226 |huge_pages_free |
19-11-23 14:19:16.000| 0.000000000|testhost1 |1.202.240.226 |huge_pages_free |
19-11-23 14:19:21.000| 0.000000000|testhost1 |1.202.240.226 |huge_pages_free |
19-11-23 14:19:26.000| 0.000000000|testhost1 |1.202.240.226 |huge_pages_free |Query OK, 3029 row(s) in set (0.060828s)
```
## Support Kubernates liveness probe
The blm_telegraf support the liveness probe.
When the service is running, GET the url`http://ip:port/health` will return 200 OK response which means the service is running healthy. If no response, means the service is dead and need to restart it.
## Limitations
The TDengine limits the length of super table name, so if the name of Telegraf measurement name exceeds 60 byte, it will be truncated to first 60 bytes. And the length of tags name is limited within 50 byte.
[TDengine]:https://www.github.com/Taosdata/TDengine
\ No newline at end of file
# Telegraf写入TDengine的API程序
刚刚开发完Telegraf写入TDengine的APi程序,本文总结一下程序的设计思路,并在此基础上提出一种schemaless的写入TDengine的适配方法和对应的查询思路。
## Telegraf数据分析
Telegraf采集节点的数据后,按照数据的格式为measurement加上一系列的tags,再加上一系列的fields和timestamp,组成一条记录发出。
```
cpu,cpu=cpu-total,host=liutaodeMacBook-Pro.local usage_irq=0,usage_guest=0,usage_guest_nice=0,usage_iowait=0,usage_softirq=0,usage_steal=0,usage_user=10.55527763881941,usage_system=3.5767883941970986,usage_idle=85.86793396698349,usage_nice=0 1571663200000000000
```
上面是一条按照influxdb格式输出的记录,第一个字段是measurement, 然后接着两个tags, tags后面的空格来作为tags和fields的分隔;fields和timestamp之间也是用空格分隔。
```json
{
"fields":{
"usage_guest":0,
"usage_guest_nice":0,
"usage_idle":87.73726273726274,
"usage_iowait":0,
"usage_irq":0,
"usage_nice":0,
"usage_softirq":0,
"usage_steal":0,
"usage_system":2.6973026973026974,
"usage_user":9.565434565434565
},
"name":"cpu",
"tags":{
"cpu":"cpu-total",
"host":"liutaodeMacBook-Pro.local"
},
"timestamp":1571665100
}
```
如果按json格式输出,则一条json格式的记录如上。
上面的数据个看上去跟TDengine的记录格式十分类似,很自然的我们可以把name作为超级表名,tags作为tags,fields作为values,timestamp作为timestamp,对应上TDengine的数据格式,照理说应该非常好写入TDengine。但实际过程中发现telegraf在输出数据时,经常会遇到一个问题,就是name,tags格式一样的情况下,fields的格式不一样,fields里的名字,数量,都可能变化。这种变化不是随意变化,可能是两三种组合的变化。比如如下情况
```
swap,host=testhost1 out=0i,in=0i 1574663615000000000
swap,host=testhost1 total=4294967296i,used=3473670144i,free=821297152i,used_percent=80.877685546875 1574663615000000000
```
同一个时间点来的两条记录,name都是swap,tag都是host,但fields却完全不相同。
再比如
```
system,host=testhost1 uptime_format="5 days, 1:07" 1574663615000000000
system,host=testhost1 uptime=436070i 1574663615000000000
system,host=testhost1 load15=5.9521484375,n_cpus=4i,n_users=6i,load1=3.17138671875,load5=6.462890625 1574663615000000000
```
同一时间点来的三条记录,name都是system,tags都是host,但fields完全不同。
如果以name作为TDengine的超级表名,就会面临到表格的结构发生变化。并且,由于influxdb是schemaless的设计,他们能够很好的处理这种变化,不管fields如何变化,都能顺利写入。因此,很难保证telegraf后续产生的数据,fields发生会怎么变化。如何设计TDengine的存储表结构,是一个问题。
## TDengine的表结构设计思路
面对这种数据来源,我们一般可以用两种设计思路:
### 提前设计好超级表,包含所有fields, 这种就是schema方式
一种,是在对数据的行为有充分的了解后,提前设计好TDengine的表格式,将所有可能变化的fields都放到values中,提前创建好超级表;然后在插入数据时,把同一时间戳的所有fields都收集齐后,再组装成一条TDengine的SQL记录,写入TDengine。这种,是我们当前通常用到的方法,可以成为schema方式的写入。这种方法的优点是,比较符合我们TDengine的设计假设,values有多列,写入性能相对高一些。但也有明显的缺点,需要提前对数据做大量的分析,确定每个测量的格式,手动来写schema配置文件或手动在TDengine客户端创建超级表,对于节点很多的监控,这种设计会带来较大的工作量。
### 不提前设计超级表,设计一种建表规则,根据来的数据自动写入,类似influxdb的schemaless的方式
另外一种,就是根据收到的数据自动建表,只要符合name,tags,fields,timestamp的格式,都能顺利写的创建TDengine的表,并写入TDengine。本程序就是采用这种思路,将每一个fields单独拆开,和name,tags组合起来,形成一个单列的表。这样的超级表符合任何fields,对于任意fields都可以顺利写入。下面将沿着这个设计思路继续展开。
## 超级表
本程序以收到的原始数据name作为超级表名,原始数据中的tags作为tags,同时,额外增加两个tag,一个是发来请求的源IP,用来区分设备;另一个是field,这个tag的值是原始数据中fields的名称,用来表明这个超级表存的是哪个指标。以上面的system这个原始数据为例,则超级表结构为
```toml
stablename = system
tags = ['host','srcip','field']
values = ['timestamp','value']
```
其中,tags的类型都为binary(50),长度超过50的标签值都截断为50;field这个标签则的可能值则为
```toml
field : ['uptime_format','uptime','load15','n_cpus','n_users','load1','load5']
```
## value的类型
由于无法预知数据的类型,以及简化程序实现,我们将value的类型分成两类,一类是数值型,统一用double来存储;一类是字符串,统一用binary(256)的类型来存。由于所有field都要用同一个超级表来存,因此我一开始就为每个name创建了两个超级表,一个是数值型的超级表,表名就是name;另一个是字符串型的超级表,表名是name加上_str后缀。然后根据field的数据类型,如果是数值型,就用数值型的超级表来创建表;如果是字符串型的,就用name_str的超级表来创建表。
因此,超级表创建的时候会创建两倍的数据量
```
name | created_time |columns| tags | tables |
====================================================================================================================
system | 19-11-22 21:48:10.205| 2| 3| 12|
system_str | 19-11-22 21:48:10.205| 2| 3| 2|
cpu | 19-11-22 21:48:10.225| 2| 4| 200|
cpu_str | 19-11-22 21:48:10.226| 2| 4| 0|
processes | 19-11-22 21:48:10.230| 2| 3| 16|
processes_str | 19-11-22 21:48:10.230| 2| 3| 0|
disk | 19-11-22 21:48:10.233| 2| 7| 357|
disk_str | 19-11-22 21:48:10.234| 2| 7| 0|
diskio | 19-11-22 21:48:10.247| 2| 4| 72|
diskio_str | 19-11-22 21:48:10.248| 2| 4| 0|
swap | 19-11-22 21:48:10.254| 2| 3| 7|
swap_str | 19-11-22 21:48:10.255| 2| 3| 0|
mem | 19-11-22 21:48:10.272| 2| 3| 61|
mem_str | 19-11-22 21:48:10.272| 2| 3| 0|
Query OK, 14 row(s) in set (0.000588s)
```
因此查询的时候,需要根据查询值的类型,选择不同的超级表来查询
比如,对于数值类型,查询n_cpus值的语句为
```sql
Select * from system where field = "n_cpus";
```
对于字符串类型,查询uptime_format的值的语句为
```sql
Select * from system_str where field = "uptime_format";
```
## 表的创建
对于每个field,程序为它创建了一个表,表名规则如下:
将原始数据的所有tags值加上源ip加上field的名称,组成一个长的字符串,然后进行MD5计算,输出的结果加上MD5_作为前缀,形成表名。
这种规则,确保了只要数据的tags等特征不变,表就不会发生变化。
```
...
md5_b26d30c2e07529ac309d836b3b222f15 | 19-11-24 21:34:35.830| 2|processes |
md5_08147d718d4961368155f90432eab536 | 19-11-22 21:48:10.748| 2|disk |
md5_105158abfca0bbf0d932cc74bfc7e136 | 19-11-24 21:34:35.846| 2|mem |
md5_e6842b5c6b9744b7d5ce3510a4d54c98 | 19-11-24 21:34:35.874| 2|disk |
md5_285fd02686e0bfee76d22505dd29f14c | 19-11-22 21:48:11.509| 2|disk |
md5_9317870bb00109353f6aef58ee2ee9e9 | 19-11-24 21:34:35.919| 2|cpu |
Query OK, 727 row(s) in set (0.020405s)
```
因此在数据插入时,只要根据tags值和IP和field的名称,就能计算出表名,直接插入该表。
查询时,可以用超级表加上tag值和field值来查询,也很清晰便利。
因此,数值写入自动生成的sql语句如下:
```sql
insert into md5_285fd02686e0bfee76d22505dd29f14c values(1574663615000,375.2023040);
```
其中表名md5_285fd02686e0bfee76d22505dd29f14c是自动根据数据特征计算出来的,无需人工输入。
而查询则可以通过超级表来查询
## Schemaless写入方法
基于上面的实现,后续我们可以确定一个写入的语法,就可以不用提前设定schema,而根据接收到的数据,自动创建超级表,表,方便的写入TDengine了。
沿用现有的json格式,或者参考influxdb的语法,确定一个写入的语法,就可以实现influxdb的schemaless写入的能力。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"container/list"
"crypto/md5"
"database/sql"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
_ "github.com/taosdata/TDengine/src/connector/go/src/taosSql"
)
type metric struct {
Fields map[string]interface{}
Name string
Tags map[string]string
TimeStamp int64
}
type Metrics struct {
Metrics []metric
HostIP string
}
var (
daemonUrl string
httpworkers int
sqlworkers int
batchSize int
buffersize int
dbname string
dbuser string
dbpassword string
rwport string
debugprt int
taglen int
)
type nametag struct {
tagmap map[string]string
taglist *list.List
}
// Global vars
var (
bufPool sync.Pool
batchChans []chan string //multi table one chan
nodeChans []chan Metrics //multi node one chan
inputDone chan struct{}
workersGroup sync.WaitGroup
reportTags [][2]string
reportHostname string
taosDriverName string = "taosSql"
IsSTableCreated sync.Map
IsTableCreated sync.Map
taglist *list.List
nametagmap map[string]nametag
tagstr string
blmLog *log.Logger
logNameDefault string = "/var/log/taos/blm_telegraf.log"
)
var scratchBufPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024)
},
}
// Parse args:
func init() {
flag.StringVar(&daemonUrl, "host", "", "TDengine host.")
flag.IntVar(&batchSize, "batch-size", 10, "Batch size (input items).")
flag.IntVar(&httpworkers, "http-workers", 10, "Number of parallel http requests handler .")
flag.IntVar(&sqlworkers, "sql-workers", 10, "Number of parallel sql handler.")
flag.StringVar(&dbname, "dbname", "telegraf", "Database name where to store metrics")
flag.StringVar(&dbuser, "dbuser", "root", "User for host to send result metrics")
flag.StringVar(&dbpassword, "dbpassword", "taosdata", "User password for Host to send result metrics")
flag.StringVar(&rwport, "port", "10202", "remote write port")
flag.IntVar(&debugprt, "debugprt", 0, "if 0 not print, if 1 print the sql")
flag.IntVar(&taglen, "tag-length", 30, "the max length of tag string")
flag.IntVar(&buffersize, "buffersize", 100, "the buffer size of metrics received")
flag.Parse()
daemonUrl = daemonUrl + ":0"
nametagmap = make(map[string]nametag)
fmt.Print("host: ")
fmt.Print(daemonUrl)
fmt.Print(" port: ")
fmt.Print(rwport)
fmt.Print(" database: ")
fmt.Print(dbname)
tagstr = fmt.Sprintf(" binary(%d)", taglen)
logFile, err := os.OpenFile(logNameDefault, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
blmLog = log.New(logFile, "", log.LstdFlags)
blmLog.SetPrefix("BLM_TLG")
blmLog.SetFlags(log.LstdFlags | log.Lshortfile)
}
func main() {
for i := 0; i < httpworkers; i++ {
nodeChans = append(nodeChans, make(chan Metrics, buffersize))
}
createDatabase(dbname)
for i := 0; i < httpworkers; i++ {
workersGroup.Add(1)
go NodeProcess(i)
}
for i := 0; i < sqlworkers; i++ {
batchChans = append(batchChans, make(chan string, batchSize))
}
for i := 0; i < sqlworkers; i++ {
workersGroup.Add(1)
go processBatches(i)
}
http.HandleFunc("/telegraf", func(w http.ResponseWriter, r *http.Request) {
addr := strings.Split(r.RemoteAddr, ":")
idx := TAOShashID([]byte(addr[0]))
reqBuf, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var req Metrics
if err := json.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req.HostIP = addr[0]
nodeChans[idx%httpworkers] <- req
w.WriteHeader(http.StatusAccepted)
})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
})
blmLog.Fatal(http.ListenAndServe(":"+rwport, nil))
}
func TAOShashID(ba []byte) int {
var sum int = 0
for i := 0; i < len(ba); i++ {
sum += int(ba[i] - '0')
}
return sum
}
func TAOSstrCmp(a string, b string) bool {
//return if a locates before b in a dictrionary.
for i := 0; i < len(a) && i < len(b); i++ {
if int(a[i]-'0') > int(b[i]-'0') {
return false
} else if int(a[i]-'0') < int(b[i]-'0') {
return true
}
}
if len(a) > len(b) {
return false
} else {
return true
}
}
func NodeProcess(workerid int) error {
for req := range nodeChans[workerid] {
ProcessReq(req)
}
return nil
}
func OrderInsert(ts int64, l *list.List) {
e := l.Front()
if e == nil {
l.PushFront(ts)
return
}
for e = l.Front(); e != nil; e = e.Next() {
if e.Value.(int64) < ts {
continue
} else {
l.InsertBefore(ts, e)
return
}
}
}
func OrderInsertS(s string, l *list.List) {
e := l.Front()
if e == nil {
l.PushFront(s)
return
}
for e = l.Front(); e != nil; e = e.Next() {
str := e.Value.(string)
if TAOSstrCmp(str, s) {
continue
} else {
l.InsertBefore(s, e)
return
}
}
l.PushBack(s)
return
}
func ProcessReq(req Metrics) error {
tsmap := make(map[int64]map[string][]metric)
tslist := list.New()
addr := req.HostIP
var lastTs int64 = 0
for i := 0; i < len(req.Metrics); i++ {
m := req.Metrics[i]
if tsmap[m.TimeStamp] == nil {
tsmap[m.TimeStamp] = make(map[string][]metric)
}
mp := tsmap[m.TimeStamp]
mp[m.Name] = append(mp[m.Name], m)
if lastTs != m.TimeStamp { //there is still some case that will make mistake, when the timestamp is totally out of order. but right now just forget it.
OrderInsert(m.TimeStamp, tslist)
}
lastTs = m.TimeStamp
}
for e := tslist.Front(); e != nil; e = e.Next() {
namemap, ok := tsmap[e.Value.(int64)]
if ok {
for _, v := range namemap {
ProcessData(v, dbname, addr)
}
} else {
info := fmt.Sprintf("ProcessReq: cannot retrieve map")
panic(info)
}
}
return nil
}
func SerilizeTDengine(m metric, dbn string, hostip string, taglist *list.List, db *sql.DB) error {
var tbna []string
for _, v := range m.Tags {
tbna = append(tbna, v)
}
sort.Strings())
tbn := strings.Join(tbna, "") // Go map 遍历结果是随机的,必须排下序
for k, v := range m.Fields {
s := m.Name + tbn + hostip + k
//fmt.Print(s)
s = "MD5_" + md5V2(s)
_, ok := IsTableCreated.Load(s)
if !ok {
var sqlcmd string
switch v.(type) {
case string:
sqlcmd = "create table if not exists " + s + " using " + m.Name + "_str tags("
default:
sqlcmd = "create table if not exists " + s + " using " + m.Name + " tags("
}
for e := taglist.Front(); e != nil; e = e.Next() {
tagvalue, has := m.Tags[e.Value.(string)]
if len(tagvalue) >= 60 {
tagvalue = tagvalue[:59]
}
if has {
sqlcmd = sqlcmd + "\"" + tagvalue + "\","
} else {
sqlcmd = sqlcmd + "null,"
}
}
sqlcmd = sqlcmd + "\"" + hostip + "\"," + "\"" + k + "\")\n"
execSql(dbn, sqlcmd, db)
IsTableCreated.Store(s, true)
} else {
idx := TAOShashID([]byte(s))
sqlcmd := " " + s + " values("
tls := strconv.FormatInt(m.TimeStamp, 10)
switch v.(type) {
case string:
sqlcmd = sqlcmd + tls + ",\"" + v.(string) + "\")"
case int64:
sqlcmd = sqlcmd + tls + "," + strconv.FormatInt(v.(int64), 10) + ")"
case float64:
sqlcmd = sqlcmd + tls + "," + strconv.FormatFloat(v.(float64), 'E', -1, 64) + ")"
default:
panic("Checktable error value type")
}
batchChans[idx%sqlworkers] <- sqlcmd
//execSql(dbn,sqlcmd)
}
}
return nil
}
func ProcessData(ts []metric, dbn string, hostip string) error {
db, err := sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/"+dbname)
if err != nil {
blmLog.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
schema, ok := IsSTableCreated.Load(ts[0].Name)
if !ok {
var nt nametag
nt.taglist = list.New()
nt.tagmap = make(map[string]string)
IsSTableCreated.Store(ts[0].Name, nt)
tagmap := nt.tagmap
taglist := nt.taglist
for i := 0; i < len(ts); i++ {
for k, _ := range ts[i].Tags {
_, ok := tagmap[k]
if !ok {
taglist.PushBack(k)
tagmap[k] = "y"
}
}
}
var sqlcmd string
sqlcmd = "create table if not exists " + ts[0].Name + " (ts timestamp, value double) tags("
sqlcmd1 := "create table if not exists " + ts[0].Name + "_str (ts timestamp, value binary(256)) tags("
for e := taglist.Front(); e != nil; e = e.Next() {
sqlcmd = sqlcmd + e.Value.(string) + tagstr + ","
sqlcmd1 = sqlcmd1 + e.Value.(string) + tagstr + ","
}
sqlcmd = sqlcmd + "srcip binary(20), field binary(40))\n"
sqlcmd1 = sqlcmd1 + "srcip binary(20), field binary(40))\n"
execSql(dbn, sqlcmd, db)
execSql(dbn, sqlcmd1, db)
for i := 0; i < len(ts); i++ {
SerilizeTDengine(ts[i], dbn, hostip, taglist, db)
}
return nil
}
nt := schema.(nametag)
tagmap := nt.tagmap
taglist := nt.taglist
var sqlcmd, sqlcmd1 string
for i := 0; i < len(ts); i++ {
for k, _ := range ts[i].Tags {
_, ok := tagmap[k]
if !ok {
sqlcmd = sqlcmd + "alter table " + ts[0].Name + " add tag " + k + tagstr + "\n"
sqlcmd1 = sqlcmd1 + "alter table " + ts[0].Name + "_str add tag " + k + tagstr + "\n"
taglist.PushBack(k)
tagmap[k] = "y"
}
}
}
execSql(dbn, sqlcmd, db)
execSql(dbn, sqlcmd1, db)
for i := 0; i < len(ts); i++ {
SerilizeTDengine(ts[i], dbn, hostip, taglist, db)
}
return nil
}
func createDatabase(dbname string) {
db, err := sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/")
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
sqlcmd := fmt.Sprintf("create database if not exists %s", dbname)
_, err = db.Exec(sqlcmd)
sqlcmd = fmt.Sprintf("use %s", dbname)
_, err = db.Exec(sqlcmd)
checkErr(err)
return
}
func execSql(dbname string, sqlcmd string, db *sql.DB) {
if len(sqlcmd) < 1 {
return
}
_, err := db.Exec(sqlcmd)
if err != nil {
var count int = 2
for {
if err != nil && count > 0 {
<-time.After(time.Second * 1)
_, err = db.Exec(sqlcmd)
count--
} else {
if err != nil {
blmLog.Printf("execSql Error: %s sqlcmd: %s\n", err, sqlcmd)
return
}
break
}
}
}
return
}
func checkErr(err error) {
if err != nil {
blmLog.Println(err)
}
}
func md5V2(str string) string {
data := []byte(str)
has := md5.Sum(data)
md5str := fmt.Sprintf("%x", has)
return md5str
}
func processBatches(iworker int) {
var i int
db, err := sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/"+dbname)
if err != nil {
blmLog.Printf("processBatches Open database error: %s\n", err)
var count int = 5
for {
if err != nil && count > 0 {
<-time.After(time.Second * 1)
_, err = sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/"+dbname)
count--
} else {
if err != nil {
blmLog.Printf("processBatches Error: %s open database\n", err)
return
}
break
}
}
}
defer db.Close()
sqlcmd := make([]string, batchSize+1)
i = 0
sqlcmd[i] = "Insert into"
i++
for onepoint := range batchChans[iworker] {
sqlcmd[i] = onepoint
i++
if i > batchSize {
i = 1
_, err := db.Exec(strings.Join(sqlcmd, ""))
if err != nil {
var count int = 2
for {
if err != nil && count > 0 {
<-time.After(time.Second * 1)
_, err = db.Exec(strings.Join(sqlcmd, ""))
count--
} else {
if err != nil {
blmLog.Printf("Error: %s sqlcmd: %s\n", err, strings.Join(sqlcmd, ""))
}
break
}
}
}
}
}
if i > 1 {
i = 1
_, err := db.Exec(strings.Join(sqlcmd, ""))
if err != nil {
var count int = 2
for {
if err != nil && count > 0 {
<-time.After(time.Second * 1)
_, err = db.Exec(strings.Join(sqlcmd, ""))
count--
} else {
if err != nil {
blmLog.Printf("Error: %s sqlcmd: %s\n", err, strings.Join(sqlcmd, ""))
}
break
}
}
}
}
workersGroup.Done()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册