提交 5dd0158e 编写于 作者: weixin_48148422's avatar weixin_48148422 提交者: Bomin Zhang

td-40: move 'alert' project into tdengine

上级 a4e1157d
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
vendor/
# Project specific files
cmd/alert/alert
cmd/alert/alert.log
*.db
*.gz
\ No newline at end of file
# Alert [DRAFT]
The Alert application reads data from [TDEngine](https://www.taosdata.com/), calculating according to predefined rules to generate alerts, and pushes alerts to downstream applications like [AlertManager](https://github.com/prometheus/alertmanager).
## Install
### From Binary
TODO: 安装包具体位置
Precompiled binaries is available at [taosdata website](https://www.taosdata.com/), please download and unpack it by below shell command.
```
$ tar -xzf alert-$version-$OS-$ARCH.tar.gz
```
### From Source Code
Two prerequisites are required to install from source.
1. TDEngine server or client must be installed.
2. Latest [Go](https://golang.org) language must be installed.
When these two prerequisites are ready, please follow steps below to build the application:
```
$ mkdir taosdata
$ cd taosdata
$ git clone https://github.com/taosdata/tdengine.git
$ cd tdengine/alert/cmd/alert
$ go build
```
If `go build` fails because some of the dependency packages cannot be downloaded, please follow steps in [goproxy.io](https://goproxy.io) to configure `GOPROXY` and try `go build` again.
## Configure
The configuration file format of Alert application is standard `json`, below is its default content, please revise according to actual scenario.
```json
{
"port": 8100,
"database": "file:alert.db",
"tdengine": "root:taosdata@/tcp(127.0.0.1:0)/",
"log": {
"level": "production",
"path": "alert.log"
},
"receivers": {
"alertManager": "http://127.0.0.1:9093/api/v1/alerts",
"console": true
}
}
```
The use of each configuration item is:
* **port**: This is the `http` service port which enables other application to manage rules by `restful API`.
* **database**: rules are stored in a `sqlite` database, this is the path of the database file (if the file does not exist, the alert application creates it automatically).
* **tdengine**: connection string of `TDEngine` server, note in most cases the database information should be put in a rule, thus it should NOT be included here.
* **log > level**: log level, could be `production` or `debug`.
* **log > path**: log output file path.
* **receivers > alertManager**: the alert application pushes alerts to `AlertManager` at this URL.
* **receivers > console**: print out alerts to console (stdout) or not.
When the configruation file is ready, the alert application can be started with below command (`alert.cfg` is the path of the configuration file):
```
$ ./alert -cfg alert.cfg
```
## Prepare an alert rule
From technical aspect, an alert could be defined as: query and filter recent data from `TDEngine`, and calculating out a boolean value from these data according to a formula, and trigger an alert if the boolean value last for a certain duration.
This is a rule example in `json` format:
```json
{
"name": "rule1",
"sql": "select sum(col1) as sumCol1 from test.meters where ts > now - 1h group by areaid",
"expr": "sumCol1 > 10",
"for": "10m",
"period": "1m",
"labels": {
"ruleName": "rule1"
},
"annotations": {
"summary": "sum of rule {{$labels.ruleName}} of area {{$values.areaid}} is {{$values.sumCol1}}"
}
}
```
The fields of the rule is explained below:
* **name**: the name of the rule, must be unique.
* **sql**: this is the `sql` statement used to query data from `TDEngine`, columns of the query result are used in later processing, so please give the column an alias if aggregation functions are used.
* **expr**: an expression whose result is a boolean value, arithmatic and logical calculations can be included in the expression, and builtin functions (see below) are also supported. Alerts are only triggered when the expression evaluates to `true`.
* **for**: this item is a duration which default value is zero second. when `expr` evaluates to `true` and last at least this duration, an alert is triggered.
* **period**: the interval for the alert application to check the rule, default is 1 minute.
* **labels**: a label list, labels are used to generate alert information. note if the `sql` statement includes a `group by` clause, the `group by` columns are inserted into this list automatically.
* **annotations**: the template of alert information which is in [go template](https://golang.org/pkg/text/template) syntax, labels can be referenced by `$labels.<label name>` and columns of the query result can be referenced by `$values.<column name>`.
### Operators
Operators which can be used in the `expr` field of a rule are list below, `()` can be to change precedence if default does not meet requirement.
<table>
<thead>
<tr> <td>Operator</td><td>Unary/Binary</td><td>Precedence</td><td>Effect</td> </tr>
</thead>
<tbody>
<tr> <td>~</td><td>Unary</td><td>6</td><td>Bitwise Not</td> </tr>
<tr> <td>!</td><td>Unary</td><td>6</td><td>Logical Not</td> </tr>
<tr> <td>+</td><td>Unary</td><td>6</td><td>Positive Sign</td> </tr>
<tr> <td>-</td><td>Unary</td><td>6</td><td>Negative Sign</td> </tr>
<tr> <td>*</td><td>Binary</td><td>5</td><td>Multiplication</td> </tr>
<tr> <td>/</td><td>Binary</td><td>5</td><td>Division</td> </tr>
<tr> <td>%</td><td>Binary</td><td>5</td><td>Modulus</td> </tr>
<tr> <td><<</td><td>Binary</td><td>5</td><td>Bitwise Left Shift</td> </tr>
<tr> <td>>></td><td>Binary</td><td>5</td><td>Bitwise Right Shift</td> </tr>
<tr> <td>&</td><td>Binary</td><td>5</td><td>Bitwise And</td> </tr>
<tr> <td>+</td><td>Binary</td><td>4</td><td>Addition</td> </tr>
<tr> <td>-</td><td>Binary</td><td>4</td><td>Subtraction</td> </tr>
<tr> <td>|</td><td>Binary</td><td>4</td><td>Bitwise Or</td> </tr>
<tr> <td>^</td><td>Binary</td><td>4</td><td>Bitwise Xor</td> </tr>
<tr> <td>==</td><td>Binary</td><td>3</td><td>Equal</td> </tr>
<tr> <td>!=</td><td>Binary</td><td>3</td><td>Not Equal</td> </tr>
<tr> <td><</td><td>Binary</td><td>3</td><td>Less Than</td> </tr>
<tr> <td><=</td><td>Binary</td><td>3</td><td>Less Than or Equal</td> </tr>
<tr> <td>></td><td>Binary</td><td>3</td><td>Great Than</td> </tr>
<tr> <td>>=</td><td>Binary</td><td>3</td><td>Great Than or Equal</td> </tr>
<tr> <td>&&</td><td>Binary</td><td>2</td><td>Logical And</td> </tr>
<tr> <td>||</td><td>Binary</td><td>1</td><td>Logical Or</td> </tr>
</tbody>
</table>
### Built-in Functions
Built-in function can be used in the `expr` field of a rule.
* **min**: returns the minimum one of its arguments, for example: `min(1, 2, 3)` returns `1`.
* **max**: returns the maximum one of its arguments, for example: `max(1, 2, 3)` returns `3`.
* **sum**: returns the sum of its arguments, for example: `sum(1, 2, 3)` returns `6`.
* **avg**: returns the average of its arguments, for example: `avg(1, 2, 3)` returns `2`.
* **sqrt**: returns the square root of its argument, for example: `sqrt(9)` returns `3`.
* **ceil**: returns the minimum integer which greater or equal to its argument, for example: `ceil(9.1)` returns `10`.
* **floor**: returns the maximum integer which lesser or equal to its argument, for example: `floor(9.9)` returns `9`.
* **round**: round its argument to nearest integer, for examples: `round(9.9)` returns `10` and `round(9.1)` returns `9`.
* **log**: returns the natural logarithm of its argument, for example: `log(10)` returns `2.302585`.
* **log10**: returns base 10 logarithm of its argument, for example: `log10(10)` return `1`.
* **abs**: returns the absolute value of its argument, for example: `abs(-1)` returns `1`.
* **if**: if the first argument is `true` returns its second argument, and returns its third argument otherwise, for examples: `if(true, 10, 100)` returns `10` and `if(false, 10, 100)` returns `100`.
## Rule Management
* Add / Update
* API address: http://\<server\>:\<port\>/api/update-rule
* Method: POST
* Body: the rule
* Example:curl -d '@rule.json' http://localhost:8100/api/update-rule
* Delete
* API address: http://\<server\>:\<port\>/api/delete-rule?name=\<rule name\>
* Method:DELETE
* Example:curl -X DELETE http://localhost:8100/api/delete-rule?name=rule1
* Enable / Disable
* API address: http://\<server\>:\<port\>/api/enable-rule?name=\<rule name\>&enable=[true | false]
* Method POST
* Example:curl -X POST http://localhost:8100/api/enable-rule?name=rule1&enable=true
* Retrieve rule list
* API address: http://\<server\>:\<port\>/api/list-rule
* Method: GET
* Example:curl http://localhost:8100/api/list-rule
# Alert [草稿]
报警监测程序,从 [TDEngine](https://www.taosdata.com/) 读取数据后,根据预定义的规则计算和生成报警,并将它们推送到 [AlertManager](https://github.com/prometheus/alertmanager) 或其它下游应用。
## 安装
### 使用编译好的二进制文件
TODO: 安装包具体位置
您可以从 [涛思数据](https://www.taosdata.com/) 官网下载最新的安装包。下载完成后,使用以下命令解压:
```
$ tar -xzf alert-$version-$OS-$ARCH.tar.gz
```
### 从源码安装
从源码安装需要在您用于编译的计算机上提前安装好 TDEngine 的服务端或客户端,如果您还没有安装,可以参考 TDEngine 的文档。
报警监测程序使用 [Go语言](https://golang.org) 开发,请安装最新版的 Go 语言编译环境。
```
$ mkdir taosdata
$ cd taosdata
$ git clone https://github.com/taosdata/tdengine.git
$ cd tdengine/alert/cmd/alert
$ go build
```
如果由于部分包无法下载导致 `go build` 失败,请根据 [goproxy.io](https://goproxy.io) 上的说明配置好 `GOPROXY` 再重新执行 `go build`
## 配置
报警监测程序的配置文件采用标准`json`格式,下面是默认的文件内容,请根据实际情况修改。
```json
{
"port": 8100,
"database": "file:alert.db",
"tdengine": "root:taosdata@/tcp(127.0.0.1:0)/",
"log": {
"level": "production",
"path": "alert.log"
},
"receivers": {
"alertManager": "http://127.0.0.1:9093/api/v1/alerts",
"console": true
}
}
```
其中:
* **port**:报警监测程序支持使用 `restful API` 对规则进行管理,这个参数用于配置 `http` 服务的侦听端口。
* **database**:报警监测程序将规则保存到了一个 `sqlite` 数据库中,这个参数用于指定数据库文件的路径(不需要提前创建这个文件,如果它不存在,程序会自动创建它)。
* **tdengine**`TDEngine` 的连接信息,一般来说,数据库信息应该在报警规则中指定,所以这里 **不** 应包含这一部分信息。
* **log > level**:日志的记录级别,可选 `production``debug`
* **log > path**:日志文件的路径。
* **receivers > alertManager**:报警监测程序会将报警推送到 `AlertManager`,在这里指定 `AlertManager` 的接收地址。
* **receivers > console**:是否输出到控制台 (stdout)。
准备好配置文件后,可使用下面的命令启动报警监测程序( `alert.cfg` 是配置文件的路径):
```
$ ./alert -cfg alert.cfg
```
## 编写报警规则
从技术角度,可以将报警描述为:从 `TDEngine` 中查询最近一段时间、符合一定过滤条件的数据,并基于这些数据根据定义好的计算方法得出一个结果,当结果符合某个条件且持续一定时间后,触发报警。
根据上面的描述,可以很容易的知道报警规则中需要包含的大部分信息。 以下是一个完整的报警规则,采用标准 `json` 格式:
```json
{
"name": "rule1",
"sql": "select sum(col1) as sumCol1 from test.meters where ts > now - 1h group by areaid",
"expr": "sumCol1 > 10",
"for": "10m",
"period": "1m",
"labels": {
"ruleName": "rule1"
},
"annotations": {
"summary": "sum of rule {{$labels.ruleName}} of area {{$values.areaid}} is {{$values.sumCol1}}"
}
}
```
其中:
* **name**:用于为规则指定一个唯一的名字。
* **sql**:从 `TDEngine` 中查询数据时使用的 `sql` 语句,查询结果中的列将被后续计算使用,所以,如果使用了聚合函数,请为这一列指定一个别名。
* **expr**:一个计算结果为布尔型的表达式,支持算数运算、逻辑运算,并且内置了部分函数,也可以引用查询结果中的列。 当表达式计算结果为 `true` 时,进入报警状态。
* **for**:当表达式计算结果为 `true` 的连续时长超过这个选项时,触发报警,否则报警处于“待定”状态。默认为0,表示一旦计算结果为 `true`,立即触发报警。
* **period**:规则的检查周期,默认1分钟。
* **labels**:人为指定的标签列表,标签可以在生成报警信息引用。如果 `sql` 中包含 `group by` 子句,则所有用于分组的字段会被自动加入这个标签列表中。
* **annotations**:用于定义报警信息,使用 [go template](https://golang.org/pkg/text/template) 语法,其中,可以通过 `$labels.<label name>` 引用标签,也可以通过 `$values.<column name>` 引用查询结果中的列。
### 运算符
以下是 `expr` 字段中支持的运算符,您可以使用 `()` 改变运算的优先级。
<table>
<thead>
<tr> <td>运算符</td><td>单目/双目</td><td>优先级</td><td>作用</td> </tr>
</thead>
<tbody>
<tr> <td>~</td><td>单目</td><td>6</td><td>按位取反</td> </tr>
<tr> <td>!</td><td>单目</td><td>6</td><td>逻辑非</td> </tr>
<tr> <td>+</td><td>单目</td><td>6</td><td>正号</td> </tr>
<tr> <td>-</td><td>单目</td><td>6</td><td>负号</td> </tr>
<tr> <td>*</td><td>双目</td><td>5</td><td>乘法</td> </tr>
<tr> <td>/</td><td>双目</td><td>5</td><td>除法</td> </tr>
<tr> <td>%</td><td>双目</td><td>5</td><td>取模(余数)</td> </tr>
<tr> <td><<</td><td>双目</td><td>5</td><td>按位左移</td> </tr>
<tr> <td>>></td><td>双目</td><td>5</td><td>按位右移</td> </tr>
<tr> <td>&</td><td>双目</td><td>5</td><td>按位与</td> </tr>
<tr> <td>+</td><td>双目</td><td>4</td><td>加法</td> </tr>
<tr> <td>-</td><td>双目</td><td>4</td><td>减法</td> </tr>
<tr> <td>|</td><td>双目</td><td>4</td><td>按位或</td> </tr>
<tr> <td>^</td><td>双目</td><td>4</td><td>按位异或</td> </tr>
<tr> <td>==</td><td>双目</td><td>3</td><td>等于</td> </tr>
<tr> <td>!=</td><td>双目</td><td>3</td><td>不等于</td> </tr>
<tr> <td><</td><td>双目</td><td>3</td><td>小于</td> </tr>
<tr> <td><=</td><td>双目</td><td>3</td><td>小于或等于</td> </tr>
<tr> <td>></td><td>双目</td><td>3</td><td>大于</td> </tr>
<tr> <td>>=</td><td>双目</td><td>3</td><td>大于或等于</td> </tr>
<tr> <td>&&</td><td>双目</td><td>2</td><td>逻辑与</td> </tr>
<tr> <td>||</td><td>双目</td><td>1</td><td>逻辑或</td> </tr>
</tbody>
</table>
### 内置函数
目前支持以下内置函数,可以在报警规则的 `expr` 字段中使用这些函数:
* **min**:取多个值中的最小值,例如 `min(1, 2, 3)` 返回 `1`
* **max**:取多个值中的最大值,例如 `max(1, 2, 3)` 返回 `3`
* **sum**:求和,例如 `sum(1, 2, 3)` 返回 `6`
* **avg**:求算术平均值,例如 `avg(1, 2, 3)` 返回 `2`
* **sqrt**:计算平方根,例如 `sqrt(9)` 返回 `3`
* **ceil**:上取整,例如 `ceil(9.1)` 返回 `10`
* **floor**:下取整,例如 `floor(9.9)` 返回 `9`
* **round**:四舍五入,例如 `round(9.9)` 返回 `10``round(9.1)` 返回 `9`
* **log**:计算自然对数,例如 `log(10)` 返回 `2.302585`
* **log10**:计算以10为底的对数,例如 `log10(10)` 返回 `1`
* **abs**:计算绝对值,例如 `abs(-1)` 返回 `1`
* **if**:如果第一个参数为 `true`,返回第二个参数,否则返回第三个参数,例如 `if(true, 10, 100)` 返回 `10``if(false, 10, 100)` 返回 `100`
## 规则管理
* 添加或修改
* API地址:http://\<server\>:\<port\>/api/update-rule
* Method:POST
* Body:规则定义
* 示例:curl -d '@rule.json' http://localhost:8100/api/update-rule
* 删除
* API地址:http://\<server\>:\<port\>/api/delete-rule?name=\<rule name\>
* Method:DELETE
* 示例:curl -X DELETE http://localhost:8100/api/delete-rule?name=rule1
* 挂起或恢复
* API地址:http://\<server\>:\<port\>/api/enable-rule?name=\<rule name\>&enable=[true | false]
* Method:POST
* 示例:curl -X POST http://localhost:8100/api/enable-rule?name=rule1&enable=true
* 获取列表
* API地址:http://\<server\>:\<port\>/api/list-rule
* Method:GET
* 示例:curl http://localhost:8100/api/list-rule
package app
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/taosdata/alert/models"
"github.com/taosdata/alert/utils"
"github.com/taosdata/alert/utils/log"
)
func Init() error {
if e := initRule(); e != nil {
return e
}
http.HandleFunc("/api/list-rule", onListRule)
http.HandleFunc("/api/list-alert", onListAlert)
http.HandleFunc("/api/update-rule", onUpdateRule)
http.HandleFunc("/api/enable-rule", onEnableRule)
http.HandleFunc("/api/delete-rule", onDeleteRule)
return nil
}
func Uninit() error {
uninitRule()
return nil
}
func onListRule(w http.ResponseWriter, r *http.Request) {
var res []*Rule
rules.Range(func(k, v interface{}) bool {
res = append(res, v.(*Rule))
return true
})
w.Header().Add("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(res)
}
func onListAlert(w http.ResponseWriter, r *http.Request) {
var alerts []*Alert
rn := r.URL.Query().Get("rule")
rules.Range(func(k, v interface{}) bool {
if len(rn) > 0 && rn != k.(string) {
return true
}
rule := v.(*Rule)
rule.Alerts.Range(func(k, v interface{}) bool {
alert := v.(*Alert)
// TODO: not go-routine safe
if alert.State != AlertStateWaiting {
alerts = append(alerts, v.(*Alert))
}
return true
})
return true
})
w.Header().Add("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(alerts)
}
func onUpdateRule(w http.ResponseWriter, r *http.Request) {
data, e := ioutil.ReadAll(r.Body)
if e != nil {
log.Error("failed to read request body: ", e.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
rule, e := newRule(string(data))
if e != nil {
log.Error("failed to parse rule: ", e.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
if e = doUpdateRule(rule, string(data)); e != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
func doUpdateRule(rule *Rule, ruleStr string) error {
if _, ok := rules.Load(rule.Name); ok {
if len(utils.Cfg.Database) > 0 {
e := models.UpdateRule(rule.Name, ruleStr)
if e != nil {
log.Errorf("[%s]: update failed: %s", rule.Name, e.Error())
return e
}
}
log.Infof("[%s]: update succeeded.", rule.Name)
} else {
if len(utils.Cfg.Database) > 0 {
e := models.AddRule(&models.Rule{
Name: rule.Name,
Content: ruleStr,
})
if e != nil {
log.Errorf("[%s]: add failed: %s", rule.Name, e.Error())
return e
}
}
log.Infof("[%s]: add succeeded.", rule.Name)
}
rules.Store(rule.Name, rule)
return nil
}
func onEnableRule(w http.ResponseWriter, r *http.Request) {
var rule *Rule
name := r.URL.Query().Get("name")
enable := strings.ToLower(r.URL.Query().Get("enable")) == "true"
if x, ok := rules.Load(name); ok {
rule = x.(*Rule)
} else {
w.WriteHeader(http.StatusNotFound)
return
}
if rule.isEnabled() == enable {
return
}
if len(utils.Cfg.Database) > 0 {
if e := models.EnableRule(name, enable); e != nil {
if enable {
log.Errorf("[%s]: enable failed: ", name, e.Error())
} else {
log.Errorf("[%s]: disable failed: ", name, e.Error())
}
w.WriteHeader(http.StatusInternalServerError)
return
}
}
if enable {
rule = rule.clone()
rule.setNextRunTime(time.Now())
rules.Store(rule.Name, rule)
log.Infof("[%s]: enable succeeded.", name)
} else {
rule.setState(RuleStateDisabled)
log.Infof("[%s]: disable succeeded.", name)
}
}
func onDeleteRule(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
if len(name) == 0 {
return
}
if e := doDeleteRule(name); e != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
func doDeleteRule(name string) error {
if len(utils.Cfg.Database) > 0 {
if e := models.DeleteRule(name); e != nil {
log.Errorf("[%s]: delete failed: %s", name, e.Error())
return e
}
}
rules.Delete(name)
log.Infof("[%s]: delete succeeded.", name)
return nil
}
package expr
import (
"errors"
"io"
"math"
"strconv"
"strings"
"text/scanner"
)
var (
// compile errors
ErrorExpressionSyntax = errors.New("expression syntax error")
ErrorUnrecognizedFunction = errors.New("unrecognized function")
ErrorArgumentCount = errors.New("too many/few arguments")
ErrorInvalidFloat = errors.New("invalid float")
ErrorInvalidInteger = errors.New("invalid integer")
// eval errors
ErrorUnsupportedDataType = errors.New("unsupported data type")
ErrorInvalidOperationFloat = errors.New("invalid operation for float")
ErrorInvalidOperationInteger = errors.New("invalid operation for integer")
ErrorInvalidOperationBoolean = errors.New("invalid operation for boolean")
ErrorOnlyIntegerAllowed = errors.New("only integers is allowed")
ErrorDataTypeMismatch = errors.New("data type mismatch")
)
// binary operator precedence
// 5 * / % << >> &
// 4 + - | ^
// 3 == != < <= > >=
// 2 &&
// 1 ||
const (
opOr = -(iota + 1000) // ||
opAnd // &&
opEqual // ==
opNotEqual // !=
opGTE // >=
opLTE // <=
opLeftShift // <<
opRightShift // >>
)
type lexer struct {
scan scanner.Scanner
tok rune
}
func (l *lexer) init(src io.Reader) {
l.scan.Error = func(s *scanner.Scanner, msg string) {
panic(errors.New(msg))
}
l.scan.Mode = scanner.ScanIdents | scanner.ScanInts | scanner.ScanFloats | scanner.ScanStrings
l.scan.Init(src)
l.tok = l.next()
}
func (l *lexer) next() rune {
l.tok = l.scan.Scan()
switch l.tok {
case '|':
if l.scan.Peek() == '|' {
l.tok = opOr
l.scan.Scan()
}
case '&':
if l.scan.Peek() == '&' {
l.tok = opAnd
l.scan.Scan()
}
case '=':
if l.scan.Peek() == '=' {
l.tok = opEqual
l.scan.Scan()
} else {
// TODO: error
}
case '!':
if l.scan.Peek() == '=' {
l.tok = opNotEqual
l.scan.Scan()
} else {
// TODO: error
}
case '<':
if tok := l.scan.Peek(); tok == '<' {
l.tok = opLeftShift
l.scan.Scan()
} else if tok == '=' {
l.tok = opLTE
l.scan.Scan()
}
case '>':
if tok := l.scan.Peek(); tok == '>' {
l.tok = opRightShift
l.scan.Scan()
} else if tok == '=' {
l.tok = opGTE
l.scan.Scan()
}
}
return l.tok
}
func (l *lexer) token() rune {
return l.tok
}
func (l *lexer) text() string {
switch l.tok {
case opOr:
return "||"
case opAnd:
return "&&"
case opEqual:
return "=="
case opNotEqual:
return "!="
case opLeftShift:
return "<<"
case opLTE:
return "<="
case opRightShift:
return ">>"
case opGTE:
return ">="
default:
return l.scan.TokenText()
}
}
type Expr interface {
Eval(env func(string) interface{}) interface{}
}
type unaryExpr struct {
op rune
subExpr Expr
}
func (ue *unaryExpr) Eval(env func(string) interface{}) interface{} {
val := ue.subExpr.Eval(env)
switch v := val.(type) {
case float64:
if ue.op != '-' {
panic(ErrorInvalidOperationFloat)
}
return -v
case int64:
switch ue.op {
case '-':
return -v
case '~':
return ^v
default:
panic(ErrorInvalidOperationInteger)
}
case bool:
if ue.op != '!' {
panic(ErrorInvalidOperationBoolean)
}
return !v
default:
panic(ErrorUnsupportedDataType)
}
}
type binaryExpr struct {
op rune
lhs Expr
rhs Expr
}
func (be *binaryExpr) Eval(env func(string) interface{}) interface{} {
lval := be.lhs.Eval(env)
rval := be.rhs.Eval(env)
switch be.op {
case '*':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv * rv
case int64:
return lv * float64(rv)
case bool:
panic(ErrorInvalidOperationBoolean)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) * rv
case int64:
return lv * rv
case bool:
panic(ErrorInvalidOperationBoolean)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case '/':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
if rv == 0 {
return math.Inf(int(lv))
} else {
return lv / rv
}
case int64:
if rv == 0 {
return math.Inf(int(lv))
} else {
return lv / float64(rv)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case int64:
switch rv := rval.(type) {
case float64:
if rv == 0 {
return math.Inf(int(lv))
} else {
return float64(lv) / rv
}
case int64:
if rv == 0 {
return math.Inf(int(lv))
} else {
return lv / rv
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case '%':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return math.Mod(lv, rv)
case int64:
return math.Mod(lv, float64(rv))
case bool:
panic(ErrorInvalidOperationBoolean)
}
case int64:
switch rv := rval.(type) {
case float64:
return math.Mod(float64(lv), rv)
case int64:
if rv == 0 {
return math.Inf(int(lv))
} else {
return lv % rv
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case opLeftShift:
switch lv := lval.(type) {
case int64:
switch rv := rval.(type) {
case int64:
return lv << rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case opRightShift:
switch lv := lval.(type) {
case int64:
switch rv := rval.(type) {
case int64:
return lv >> rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case '&':
switch lv := lval.(type) {
case int64:
switch rv := rval.(type) {
case int64:
return lv & rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case '+':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv + rv
case int64:
return lv + float64(rv)
case bool:
panic(ErrorInvalidOperationBoolean)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) + rv
case int64:
return lv + rv
case bool:
panic(ErrorInvalidOperationBoolean)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case '-':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv - rv
case int64:
return lv - float64(rv)
case bool:
panic(ErrorInvalidOperationBoolean)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) - rv
case int64:
return lv - rv
case bool:
panic(ErrorInvalidOperationBoolean)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case '|':
switch lv := lval.(type) {
case int64:
switch rv := rval.(type) {
case int64:
return lv | rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case '^':
switch lv := lval.(type) {
case int64:
switch rv := rval.(type) {
case int64:
return lv ^ rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case opEqual:
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv == rv
case int64:
return lv == float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) == rv
case int64:
return lv == rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
switch rv := rval.(type) {
case float64:
case int64:
case bool:
return lv == rv
}
}
case opNotEqual:
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv != rv
case int64:
return lv != float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) != rv
case int64:
return lv != rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
switch rv := rval.(type) {
case float64:
case int64:
case bool:
return lv != rv
}
}
case '<':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv < rv
case int64:
return lv < float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) < rv
case int64:
return lv < rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case opLTE:
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv <= rv
case int64:
return lv <= float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) <= rv
case int64:
return lv <= rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case '>':
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv > rv
case int64:
return lv > float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) > rv
case int64:
return lv > rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case opGTE:
switch lv := lval.(type) {
case float64:
switch rv := rval.(type) {
case float64:
return lv >= rv
case int64:
return lv >= float64(rv)
case bool:
panic(ErrorDataTypeMismatch)
}
case int64:
switch rv := rval.(type) {
case float64:
return float64(lv) >= rv
case int64:
return lv >= rv
case bool:
panic(ErrorDataTypeMismatch)
}
case bool:
panic(ErrorInvalidOperationBoolean)
}
case opAnd:
switch lv := lval.(type) {
case bool:
switch rv := rval.(type) {
case bool:
return lv && rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
case opOr:
switch lv := lval.(type) {
case bool:
switch rv := rval.(type) {
case bool:
return lv || rv
default:
panic(ErrorOnlyIntegerAllowed)
}
default:
panic(ErrorOnlyIntegerAllowed)
}
}
return nil
}
type funcExpr struct {
name string
args []Expr
}
func (fe *funcExpr) Eval(env func(string) interface{}) interface{} {
argv := make([]interface{}, 0, len(fe.args))
for _, arg := range fe.args {
argv = append(argv, arg.Eval(env))
}
return funcs[fe.name].call(argv)
}
type floatExpr struct {
val float64
}
func (fe *floatExpr) Eval(env func(string) interface{}) interface{} {
return fe.val
}
type intExpr struct {
val int64
}
func (ie *intExpr) Eval(env func(string) interface{}) interface{} {
return ie.val
}
type boolExpr struct {
val bool
}
func (be *boolExpr) Eval(env func(string) interface{}) interface{} {
return be.val
}
type stringExpr struct {
val string
}
func (se *stringExpr) Eval(env func(string) interface{}) interface{} {
return se.val
}
type varExpr struct {
name string
}
func (ve *varExpr) Eval(env func(string) interface{}) interface{} {
return env(ve.name)
}
func Compile(src string) (expr Expr, err error) {
defer func() {
switch x := recover().(type) {
case nil:
case error:
err = x
default:
}
}()
lexer := lexer{}
lexer.init(strings.NewReader(src))
expr = parseBinary(&lexer, 0)
if lexer.token() != scanner.EOF {
panic(ErrorExpressionSyntax)
}
return expr, nil
}
func precedence(op rune) int {
switch op {
case opOr:
return 1
case opAnd:
return 2
case opEqual, opNotEqual, '<', '>', opGTE, opLTE:
return 3
case '+', '-', '|', '^':
return 4
case '*', '/', '%', opLeftShift, opRightShift, '&':
return 5
}
return 0
}
// binary = unary ('+' binary)*
func parseBinary(lexer *lexer, lastPrec int) Expr {
lhs := parseUnary(lexer)
for {
op := lexer.token()
prec := precedence(op)
if prec <= lastPrec {
break
}
lexer.next() // consume operator
rhs := parseBinary(lexer, prec)
lhs = &binaryExpr{op: op, lhs: lhs, rhs: rhs}
}
return lhs
}
// unary = '+|-' expr | primary
func parseUnary(lexer *lexer) Expr {
flag := false
for tok := lexer.token(); ; tok = lexer.next() {
if tok == '-' {
flag = !flag
} else if tok != '+' {
break
}
}
if flag {
return &unaryExpr{op: '-', subExpr: parsePrimary(lexer)}
}
flag = false
for tok := lexer.token(); tok == '!'; tok = lexer.next() {
flag = !flag
}
if flag {
return &unaryExpr{op: '!', subExpr: parsePrimary(lexer)}
}
flag = false
for tok := lexer.token(); tok == '~'; tok = lexer.next() {
flag = !flag
}
if flag {
return &unaryExpr{op: '~', subExpr: parsePrimary(lexer)}
}
return parsePrimary(lexer)
}
// primary = id
// | id '(' expr ',' ... ',' expr ')'
// | num
// | '(' expr ')'
func parsePrimary(lexer *lexer) Expr {
switch lexer.token() {
case '+', '-', '!', '~':
return parseUnary(lexer)
case '(':
lexer.next() // consume '('
node := parseBinary(lexer, 0)
if lexer.token() != ')' {
panic(ErrorExpressionSyntax)
}
lexer.next() // consume ')'
return node
case scanner.Ident:
id := strings.ToLower(lexer.text())
if lexer.next() != '(' {
if id == "true" {
return &boolExpr{val: true}
} else if id == "false" {
return &boolExpr{val: false}
} else {
return &varExpr{name: id}
}
}
node := funcExpr{name: id}
for lexer.next() != ')' {
arg := parseBinary(lexer, 0)
node.args = append(node.args, arg)
if lexer.token() != ',' {
break
}
}
if lexer.token() != ')' {
panic(ErrorExpressionSyntax)
}
if fn, ok := funcs[id]; !ok {
panic(ErrorUnrecognizedFunction)
} else if fn.minArgs >= 0 && len(node.args) < fn.minArgs {
panic(ErrorArgumentCount)
} else if fn.maxArgs >= 0 && len(node.args) > fn.maxArgs {
panic(ErrorArgumentCount)
}
lexer.next() // consume it
return &node
case scanner.Int:
val, e := strconv.ParseInt(lexer.text(), 0, 64)
if e != nil {
panic(ErrorInvalidFloat)
}
lexer.next()
return &intExpr{val: val}
case scanner.Float:
val, e := strconv.ParseFloat(lexer.text(), 0)
if e != nil {
panic(ErrorInvalidInteger)
}
lexer.next()
return &floatExpr{val: val}
case scanner.String:
panic(errors.New("strings are not allowed in expression at present"))
val := lexer.text()
lexer.next()
return &stringExpr{val: val}
default:
panic(ErrorExpressionSyntax)
}
}
package expr
import "testing"
func TestIntArithmetic(t *testing.T) {
cases := []struct {
expr string
expected int64
}{
{"+10", 10},
{"-10", -10},
{"3 + 4 + 5 + 6 * 7 + 8", 62},
{"3 + 4 + (5 + 6) * 7 + 8", 92},
{"3 + 4 + (5 + 6) * 7 / 11 + 8", 22},
{"3 + 4 + -5 * 6 / 7 % 8", 3},
{"10 - 5", 5},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(nil); res.(int64) != c.expected {
t.Errorf("result for expression '%s' is %v, but expected is %v", c.expr, res, c.expected)
}
}
}
func TestFloatArithmetic(t *testing.T) {
cases := []struct {
expr string
expected float64
}{
{"+10.5", 10.5},
{"-10.5", -10.5},
{"3.1 + 4.2 + 5 + 6 * 7 + 8", 62.3},
{"3.1 + 4.2 + (5 + 6) * 7 + 8.3", 92.6},
{"3.1 + 4.2 + (5.1 + 5.9) * 7 / 11 + 8", 22.3},
{"3.3 + 4.2 - 4.0 * 7.5 / 3", -2.5},
{"3.3 + 4.2 - 4 * 7.0 / 2", -6.5},
{"3.5/2.0", 1.75},
{"3.5/2", 1.75},
{"7 / 3.5", 2},
{"3.5 % 2.0", 1.5},
{"3.5 % 2", 1.5},
{"7 % 2.5", 2},
{"7.3 - 2", 5.3},
{"7 - 2.3", 4.7},
{"1 + 1.5", 2.5},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(nil); res.(float64) != c.expected {
t.Errorf("result for expression '%s' is %v, but expected is %v", c.expr, res, c.expected)
}
}
}
func TestVariable(t *testing.T) {
variables := map[string]interface{}{
"a": int64(6),
"b": int64(7),
}
env := func(key string) interface{} {
return variables[key]
}
cases := []struct {
expr string
expected int64
}{
{"3 + 4 + (+5) + a * b + 8", 62},
{"3 + 4 + (5 + a) * b + 8", 92},
{"3 + 4 + (5 + a) * b / 11 + 8", 22},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(env); res.(int64) != c.expected {
t.Errorf("result for expression '%s' is %v, but expected is %v", c.expr, res, c.expected)
}
}
}
func TestFunction(t *testing.T) {
variables := map[string]interface{}{
"a": int64(6),
"b": 7.0,
}
env := func(key string) interface{} {
return variables[key]
}
cases := []struct {
expr string
expected float64
}{
{"sum(3, 4, 5, a * b, 8)", 62},
{"sum(3, 4, (5 + a) * b, 8)", 92},
{"sum(3, 4, (5 + a) * b / 11, 8)", 22},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(env); res.(float64) != c.expected {
t.Errorf("result for expression '%s' is %v, but expected is %v", c.expr, res, c.expected)
}
}
}
func TestLogical(t *testing.T) {
cases := []struct {
expr string
expected bool
}{
{"true", true},
{"false", false},
{"true == true", true},
{"true == false", false},
{"true != true", false},
{"true != false", true},
{"5 > 3", true},
{"5 < 3", false},
{"5.2 > 3", true},
{"5.2 < 3", false},
{"5 > 3.1", true},
{"5 < 3.1", false},
{"5.1 > 3.3", true},
{"5.1 < 3.3", false},
{"5 >= 3", true},
{"5 <= 3", false},
{"5.2 >= 3", true},
{"5.2 <= 3", false},
{"5 >= 3.1", true},
{"5 <= 3.1", false},
{"5.1 >= 3.3", true},
{"5.1 <= 3.3", false},
{"5 != 3", true},
{"5.2 != 3.2", true},
{"5.2 != 3", true},
{"5 != 3.2", true},
{"5 == 3", false},
{"5.2 == 3.2", false},
{"5.2 == 3", false},
{"5 == 3.2", false},
{"!(5 > 3)", false},
{"5>3 && 3>1", true},
{"5<3 || 3<1", false},
{"4<=4 || 3<1", true},
{"4<4 || 3>=1", true},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(nil); res.(bool) != c.expected {
t.Errorf("result for expression '%s' is %v, but expected is %v", c.expr, res, c.expected)
}
}
}
func TestBitwise(t *testing.T) {
cases := []struct {
expr string
expected int64
}{
{"0x0C & 0x04", 0x04},
{"0x08 | 0x04", 0x0C},
{"0x0C ^ 0x04", 0x08},
{"0x01 << 2", 0x04},
{"0x04 >> 2", 0x01},
{"~0x04", ^0x04},
}
for _, c := range cases {
expr, e := Compile(c.expr)
if e != nil {
t.Errorf("failed to compile expression '%s': %s", c.expr, e.Error())
}
if res := expr.Eval(nil); res.(int64) != c.expected {
t.Errorf("result for expression '%s' is 0x%X, but expected is 0x%X", c.expr, res, c.expected)
}
}
}
package expr
import (
"math"
)
type builtInFunc struct {
minArgs, maxArgs int
call func([]interface{}) interface{}
}
func fnMin(args []interface{}) interface{} {
res := args[0]
for _, arg := range args[1:] {
switch v1 := res.(type) {
case int64:
switch v2 := arg.(type) {
case int64:
if v2 < v1 {
res = v2
}
case float64:
res = math.Min(float64(v1), v2)
default:
panic(ErrorUnsupportedDataType)
}
case float64:
switch v2 := arg.(type) {
case int64:
res = math.Min(v1, float64(v2))
case float64:
res = math.Min(v1, v2)
default:
panic(ErrorUnsupportedDataType)
}
default:
panic(ErrorUnsupportedDataType)
}
}
return res
}
func fnMax(args []interface{}) interface{} {
res := args[0]
for _, arg := range args[1:] {
switch v1 := res.(type) {
case int64:
switch v2 := arg.(type) {
case int64:
if v2 > v1 {
res = v2
}
case float64:
res = math.Max(float64(v1), v2)
default:
panic(ErrorUnsupportedDataType)
}
case float64:
switch v2 := arg.(type) {
case int64:
res = math.Max(v1, float64(v2))
case float64:
res = math.Max(v1, v2)
default:
panic(ErrorUnsupportedDataType)
}
default:
panic(ErrorUnsupportedDataType)
}
}
return res
}
func fnSum(args []interface{}) interface{} {
res := float64(0)
for _, arg := range args {
switch v := arg.(type) {
case int64:
res += float64(v)
case float64:
res += v
default:
panic(ErrorUnsupportedDataType)
}
}
return res
}
func fnAvg(args []interface{}) interface{} {
return fnSum(args).(float64) / float64(len(args))
}
func fnSqrt(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return math.Sqrt(float64(v))
case float64:
return math.Sqrt(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnFloor(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return v
case float64:
return math.Floor(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnCeil(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return v
case float64:
return math.Ceil(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnRound(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return v
case float64:
return math.Round(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnLog(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return math.Log(float64(v))
case float64:
return math.Log(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnLog10(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
return math.Log10(float64(v))
case float64:
return math.Log10(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnAbs(args []interface{}) interface{} {
switch v := args[0].(type) {
case int64:
if v < 0 {
return -v
}
return v
case float64:
return math.Abs(v)
default:
panic(ErrorUnsupportedDataType)
}
}
func fnIf(args []interface{}) interface{} {
v, ok := args[0].(bool)
if !ok {
panic(ErrorUnsupportedDataType)
}
if v {
return args[1]
} else {
return args[2]
}
}
var funcs = map[string]builtInFunc{
"min": builtInFunc{minArgs: 1, maxArgs: -1, call: fnMin},
"max": builtInFunc{minArgs: 1, maxArgs: -1, call: fnMax},
"sum": builtInFunc{minArgs: 1, maxArgs: -1, call: fnSum},
"avg": builtInFunc{minArgs: 1, maxArgs: -1, call: fnAvg},
"sqrt": builtInFunc{minArgs: 1, maxArgs: 1, call: fnSqrt},
"ceil": builtInFunc{minArgs: 1, maxArgs: 1, call: fnCeil},
"floor": builtInFunc{minArgs: 1, maxArgs: 1, call: fnFloor},
"round": builtInFunc{minArgs: 1, maxArgs: 1, call: fnRound},
"log": builtInFunc{minArgs: 1, maxArgs: 1, call: fnLog},
"log10": builtInFunc{minArgs: 1, maxArgs: 1, call: fnLog10},
"abs": builtInFunc{minArgs: 1, maxArgs: 1, call: fnAbs},
"if": builtInFunc{minArgs: 3, maxArgs: 3, call: fnIf},
}
package expr
import (
"math"
"testing"
)
func TestMax(t *testing.T) {
cases := []struct {
args []interface{}
expected float64
}{
{[]interface{}{int64(1), int64(2), int64(3), int64(4), int64(5)}, 5},
{[]interface{}{int64(1), int64(2), float64(3), int64(4), float64(5)}, 5},
{[]interface{}{int64(-1), int64(-2), float64(-3), int64(-4), float64(-5)}, -1},
{[]interface{}{int64(-1), int64(-1), float64(-1), int64(-1), float64(-1)}, -1},
{[]interface{}{int64(-1), int64(0), float64(-1), int64(-1), float64(-1)}, 0},
}
for _, c := range cases {
r := fnMax(c.args)
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("max(%v) = %v, want %v", c.args, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("max(%v) = %v, want %v", c.args, v, c.expected)
}
default:
t.Errorf("unknown result type max(%v)", c.args)
}
}
}
func TestMin(t *testing.T) {
cases := []struct {
args []interface{}
expected float64
}{
{[]interface{}{int64(1), int64(2), int64(3), int64(4), int64(5)}, 1},
{[]interface{}{int64(5), int64(4), float64(3), int64(2), float64(1)}, 1},
{[]interface{}{int64(-1), int64(-2), float64(-3), int64(-4), float64(-5)}, -5},
{[]interface{}{int64(-1), int64(-1), float64(-1), int64(-1), float64(-1)}, -1},
{[]interface{}{int64(1), int64(0), float64(1), int64(1), float64(1)}, 0},
}
for _, c := range cases {
r := fnMin(c.args)
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("min(%v) = %v, want %v", c.args, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("min(%v) = %v, want %v", c.args, v, c.expected)
}
default:
t.Errorf("unknown result type min(%v)", c.args)
}
}
}
func TestSumAvg(t *testing.T) {
cases := []struct {
args []interface{}
expected float64
}{
{[]interface{}{int64(1)}, 1},
{[]interface{}{int64(1), int64(2), int64(3), int64(4), int64(5)}, 15},
{[]interface{}{int64(5), int64(4), float64(3), int64(2), float64(1)}, 15},
{[]interface{}{int64(-1), int64(-2), float64(-3), int64(-4), float64(-5)}, -15},
{[]interface{}{int64(-1), int64(-1), float64(-1), int64(-1), float64(-1)}, -5},
{[]interface{}{int64(1), int64(0), float64(1), int64(1), float64(1)}, 4},
}
for _, c := range cases {
r := fnSum(c.args)
switch v := r.(type) {
case float64:
if v != c.expected {
t.Errorf("sum(%v) = %v, want %v", c.args, v, c.expected)
}
default:
t.Errorf("unknown result type sum(%v)", c.args)
}
}
for _, c := range cases {
r := fnAvg(c.args)
expected := c.expected / float64(len(c.args))
switch v := r.(type) {
case float64:
if v != expected {
t.Errorf("avg(%v) = %v, want %v", c.args, v, expected)
}
default:
t.Errorf("unknown result type avg(%v)", c.args)
}
}
}
func TestSqrt(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(0), 0},
{int64(1), 1},
{int64(256), 16},
{10.0, math.Sqrt(10)},
{10000.0, math.Sqrt(10000)},
}
for _, c := range cases {
r := fnSqrt([]interface{}{c.arg})
switch v := r.(type) {
case float64:
if v != c.expected {
t.Errorf("sqrt(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type sqrt(%v)", c.arg)
}
}
}
func TestFloor(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(0), 0},
{int64(1), 1},
{int64(-1), -1},
{10.4, 10},
{-10.4, -11},
{10.8, 10},
{-10.8, -11},
}
for _, c := range cases {
r := fnFloor([]interface{}{c.arg})
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("floor(%v) = %v, want %v", c.arg, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("floor(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type floor(%v)", c.arg)
}
}
}
func TestCeil(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(0), 0},
{int64(1), 1},
{int64(-1), -1},
{10.4, 11},
{-10.4, -10},
{10.8, 11},
{-10.8, -10},
}
for _, c := range cases {
r := fnCeil([]interface{}{c.arg})
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("ceil(%v) = %v, want %v", c.arg, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("ceil(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type ceil(%v)", c.arg)
}
}
}
func TestRound(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(0), 0},
{int64(1), 1},
{int64(-1), -1},
{10.4, 10},
{-10.4, -10},
{10.8, 11},
{-10.8, -11},
}
for _, c := range cases {
r := fnRound([]interface{}{c.arg})
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("round(%v) = %v, want %v", c.arg, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("round(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type round(%v)", c.arg)
}
}
}
func TestLog(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(1), math.Log(1)},
{0.1, math.Log(0.1)},
{10.4, math.Log(10.4)},
{10.8, math.Log(10.8)},
}
for _, c := range cases {
r := fnLog([]interface{}{c.arg})
switch v := r.(type) {
case float64:
if v != c.expected {
t.Errorf("log(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type log(%v)", c.arg)
}
}
}
func TestLog10(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(1), math.Log10(1)},
{0.1, math.Log10(0.1)},
{10.4, math.Log10(10.4)},
{10.8, math.Log10(10.8)},
{int64(100), math.Log10(100)},
}
for _, c := range cases {
r := fnLog10([]interface{}{c.arg})
switch v := r.(type) {
case float64:
if v != c.expected {
t.Errorf("log10(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type log10(%v)", c.arg)
}
}
}
func TestAbs(t *testing.T) {
cases := []struct {
arg interface{}
expected float64
}{
{int64(1), 1},
{int64(0), 0},
{int64(-1), 1},
{10.4, 10.4},
{-10.4, 10.4},
}
for _, c := range cases {
r := fnAbs([]interface{}{c.arg})
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("abs(%v) = %v, want %v", c.arg, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("abs(%v) = %v, want %v", c.arg, v, c.expected)
}
default:
t.Errorf("unknown result type abs(%v)", c.arg)
}
}
}
func TestIf(t *testing.T) {
cases := []struct {
args []interface{}
expected float64
}{
{[]interface{}{true, int64(10), int64(20)}, 10},
{[]interface{}{false, int64(10), int64(20)}, 20},
{[]interface{}{true, 10.3, 20.6}, 10.3},
{[]interface{}{false, 10.3, 20.6}, 20.6},
{[]interface{}{true, int64(10), 20.6}, 10},
{[]interface{}{false, int64(10), 20.6}, 20.6},
}
for _, c := range cases {
r := fnIf(c.args)
switch v := r.(type) {
case int64:
if v != int64(c.expected) {
t.Errorf("if(%v) = %v, want %v", c.args, v, int64(c.expected))
}
case float64:
if v != c.expected {
t.Errorf("if(%v) = %v, want %v", c.args, v, c.expected)
}
default:
t.Errorf("unknown result type if(%v)", c.args)
}
}
}
package app
import (
"regexp"
"strings"
)
type RouteMatchCriteria struct {
Tag string `yaml:"tag"`
Value string `yaml:"match"`
Re *regexp.Regexp `yaml:"-"`
}
func (c *RouteMatchCriteria) UnmarshalYAML(unmarshal func(interface{}) error) error {
var v map[string]string
if e := unmarshal(&v); e != nil {
return e
}
for k, a := range v {
c.Tag = k
c.Value = a
if strings.HasPrefix(a, "re:") {
re, e := regexp.Compile(a[3:])
if e != nil {
return e
}
c.Re = re
}
}
return nil
}
type Route struct {
Continue bool `yaml:"continue"`
Receiver string `yaml:"receiver"`
GroupWait Duration `yaml:"group_wait"`
GroupInterval Duration `yaml:"group_interval"`
RepeatInterval Duration `yaml:"repeat_interval"`
GroupBy []string `yaml:"group_by"`
Match []RouteMatchCriteria `yaml:"match"`
Routes []*Route `yaml:"routes"`
}
package app
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"regexp"
"strings"
"sync"
"sync/atomic"
"text/scanner"
"text/template"
"time"
"github.com/taosdata/alert/app/expr"
"github.com/taosdata/alert/models"
"github.com/taosdata/alert/utils"
"github.com/taosdata/alert/utils/log"
)
type Duration struct{ time.Duration }
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}
func (d *Duration) doUnmarshal(v interface{}) error {
switch value := v.(type) {
case float64:
*d = Duration{time.Duration(value)}
return nil
case string:
if duration, e := time.ParseDuration(value); e != nil {
return e
} else {
*d = Duration{duration}
}
return nil
default:
return errors.New("invalid duration")
}
}
func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if e := json.Unmarshal(b, &v); e != nil {
return e
}
return d.doUnmarshal(v)
}
const (
AlertStateWaiting = iota
AlertStatePending
AlertStateFiring
)
type Alert struct {
State uint8 `json:"-"`
LastRefreshAt time.Time `json:"-"`
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
Values map[string]interface{} `json:"values"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}
func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
switch {
case (!firing) && (alert.State == AlertStateWaiting):
return false
case (!firing) && (alert.State == AlertStatePending):
alert.State = AlertStateWaiting
return false
case (!firing) && (alert.State == AlertStateFiring):
alert.State = AlertStateWaiting
alert.EndsAt = time.Now()
case firing && (alert.State == AlertStateWaiting):
alert.StartsAt = time.Now()
if rule.For.Nanoseconds() > 0 {
alert.State = AlertStatePending
return false
}
alert.State = AlertStateFiring
case firing && (alert.State == AlertStatePending):
if time.Now().Sub(alert.StartsAt) < rule.For.Duration {
return false
}
alert.StartsAt = alert.StartsAt.Add(rule.For.Duration)
alert.State = AlertStateFiring
case firing && (alert.State == AlertStateFiring):
}
return true
}
func (alert *Alert) refresh(rule *Rule, values map[string]interface{}) {
alert.LastRefreshAt = time.Now()
defer func() {
switch x := recover().(type) {
case nil:
case error:
rule.setState(RuleStateError)
log.Errorf("[%s]: failed to evaluate: %s", rule.Name, x.Error())
default:
rule.setState(RuleStateError)
log.Errorf("[%s]: failed to evaluate: unknown error", rule.Name)
}
}()
alert.Values = values
res := rule.Expr.Eval(func(key string) interface{} {
// ToLower is required as column name in result is in lower case
return alert.Values[strings.ToLower(key)]
})
val, ok := res.(bool)
if !ok {
rule.setState(RuleStateError)
log.Errorf("[%s]: result type is not bool", rule.Name)
return
}
if !alert.doRefresh(val, rule) {
return
}
buf := bytes.Buffer{}
alert.Annotations = map[string]string{}
for k, v := range rule.Annotations {
if e := v.Execute(&buf, alert); e != nil {
log.Errorf("[%s]: failed to generate annotation '%s': %s", rule.Name, k, e.Error())
} else {
alert.Annotations[k] = buf.String()
}
buf.Reset()
}
buf.Reset()
if e := json.NewEncoder(&buf).Encode(alert); e != nil {
log.Errorf("[%s]: failed to serialize alert to JSON: %s", rule.Name, e.Error())
} else {
chAlert <- buf.String()
}
}
const (
RuleStateNormal = iota
RuleStateError
RuleStateDisabled
RuleStateRunning = 0x04
)
type Rule struct {
Name string `json:"name"`
State uint32 `json:"state"`
SQL string `json:"sql"`
GroupByCols []string `json:"-"`
For Duration `json:"for"`
Period Duration `json:"period"`
NextRunTime time.Time `json:"-"`
RawExpr string `json:"expr"`
Expr expr.Expr `json:"-"`
Labels map[string]string `json:"labels"`
RawAnnotations map[string]string `json:"annotations"`
Annotations map[string]*template.Template `json:"-"`
Alerts sync.Map `json:"-"`
}
func (rule *Rule) clone() *Rule {
return &Rule{
Name: rule.Name,
State: RuleStateNormal,
SQL: rule.SQL,
GroupByCols: rule.GroupByCols,
For: rule.For,
Period: rule.Period,
NextRunTime: time.Time{},
RawExpr: rule.RawExpr,
Expr: rule.Expr,
Labels: rule.Labels,
RawAnnotations: rule.RawAnnotations,
Annotations: rule.Annotations,
// don't copy alerts
}
}
func (rule *Rule) setState(s uint32) {
for {
old := atomic.LoadUint32(&rule.State)
new := old&0xffffffc0 | s
if atomic.CompareAndSwapUint32(&rule.State, old, new) {
break
}
}
}
func (rule *Rule) state() uint32 {
return atomic.LoadUint32(&rule.State) & 0xffffffc0
}
func (rule *Rule) isEnabled() bool {
state := atomic.LoadUint32(&rule.State)
return state&RuleStateDisabled == 0
}
func (rule *Rule) setNextRunTime(tm time.Time) {
rule.NextRunTime = tm.Round(rule.Period.Duration)
if rule.NextRunTime.Before(tm) {
rule.NextRunTime = rule.NextRunTime.Add(rule.Period.Duration)
}
}
func parseGroupBy(sql string) (cols []string, err error) {
defer func() {
if e := recover(); e != nil {
err = e.(error)
}
}()
s := scanner.Scanner{
Error: func(s *scanner.Scanner, msg string) {
panic(errors.New(msg))
},
Mode: scanner.ScanIdents | scanner.ScanInts | scanner.ScanFloats,
}
s.Init(strings.NewReader(sql))
if s.Scan() != scanner.Ident || strings.ToLower(s.TokenText()) != "select" {
err = errors.New("only select statement is allowed.")
return
}
hasGroupBy := false
for t := s.Scan(); t != scanner.EOF; t = s.Scan() {
if t != scanner.Ident {
continue
}
if strings.ToLower(s.TokenText()) != "group" {
continue
}
if s.Scan() != scanner.Ident {
continue
}
if strings.ToLower(s.TokenText()) == "by" {
hasGroupBy = true
break
}
}
if !hasGroupBy {
return
}
for {
if s.Scan() != scanner.Ident {
err = errors.New("SQL statement syntax error.")
return
}
col := strings.ToLower(s.TokenText())
cols = append(cols, col)
if s.Scan() != ',' {
break
}
}
return
}
func (rule *Rule) parseGroupBy() (err error) {
cols, e := parseGroupBy(rule.SQL)
if e == nil {
rule.GroupByCols = cols
}
return nil
}
func (rule *Rule) getAlert(values map[string]interface{}) *Alert {
sb := strings.Builder{}
for _, name := range rule.GroupByCols {
value := values[name]
if value == nil {
} else {
sb.WriteString(fmt.Sprint(value))
}
sb.WriteByte('_')
}
var alert *Alert
key := sb.String()
if v, ok := rule.Alerts.Load(key); ok {
alert = v.(*Alert)
}
if alert == nil {
alert = &Alert{Labels: map[string]string{}}
for k, v := range rule.Labels {
alert.Labels[k] = v
}
for _, name := range rule.GroupByCols {
value := values[name]
if value == nil {
alert.Labels[name] = ""
} else {
alert.Labels[name] = fmt.Sprint(value)
}
}
rule.Alerts.Store(key, alert)
}
return alert
}
func (rule *Rule) preRun(tm time.Time) bool {
if tm.Before(rule.NextRunTime) {
return false
}
rule.setNextRunTime(tm)
for {
state := atomic.LoadUint32(&rule.State)
if state != RuleStateNormal {
return false
}
if atomic.CompareAndSwapUint32(&rule.State, state, RuleStateRunning) {
break
}
}
return true
}
func (rule *Rule) run(db *sql.DB) {
rows, e := db.Query(rule.SQL)
if e != nil {
log.Errorf("[%s]: failed to query TDengine: %s", rule.Name, e.Error())
return
}
cols, e := rows.ColumnTypes()
if e != nil {
log.Errorf("[%s]: unable to get column information: %s", rule.Name, e.Error())
return
}
for rows.Next() {
values := make([]interface{}, 0, len(cols))
for range cols {
var v interface{}
values = append(values, &v)
}
rows.Scan(values...)
m := make(map[string]interface{})
for i, col := range cols {
name := strings.ToLower(col.Name())
m[name] = *(values[i].(*interface{}))
}
alert := rule.getAlert(m)
alert.refresh(rule, m)
}
now := time.Now()
rule.Alerts.Range(func(k, v interface{}) bool {
alert := v.(*Alert)
if now.Sub(alert.LastRefreshAt) > rule.Period.Duration*10 {
rule.Alerts.Delete(k)
}
return true
})
}
func (rule *Rule) postRun() {
for {
old := atomic.LoadUint32(&rule.State)
new := old & ^uint32(RuleStateRunning)
if atomic.CompareAndSwapUint32(&rule.State, old, new) {
break
}
}
}
func newRule(str string) (*Rule, error) {
rule := Rule{}
e := json.NewDecoder(strings.NewReader(str)).Decode(&rule)
if e != nil {
return nil, e
}
if rule.Period.Nanoseconds() <= 0 {
rule.Period = Duration{time.Minute}
}
rule.setNextRunTime(time.Now())
if rule.For.Nanoseconds() < 0 {
rule.For = Duration{0}
}
if e = rule.parseGroupBy(); e != nil {
return nil, e
}
if expr, e := expr.Compile(rule.RawExpr); e != nil {
return nil, e
} else {
rule.Expr = expr
}
rule.Annotations = map[string]*template.Template{}
for k, v := range rule.RawAnnotations {
v = reValue.ReplaceAllStringFunc(v, func(s string) string {
// as column name in query result is always in lower case,
// we need to convert value reference in annotations to
// lower case
return strings.ToLower(s)
})
text := "{{$labels := .Labels}}{{$values := .Values}}" + v
tmpl, e := template.New(k).Parse(text)
if e != nil {
return nil, e
}
rule.Annotations[k] = tmpl
}
return &rule, nil
}
const (
batchSize = 1024
)
var (
rules sync.Map
wg sync.WaitGroup
chStop = make(chan struct{})
chAlert = make(chan string, batchSize)
reValue = regexp.MustCompile(`\$values\.[_a-zA-Z0-9]+`)
)
func runRules() {
defer wg.Done()
db, e := sql.Open("taosSql", utils.Cfg.TDengine)
if e != nil {
log.Fatal("failed to connect to TDengine: ", e.Error())
}
defer db.Close()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
LOOP:
for {
var tm time.Time
select {
case <-chStop:
close(chAlert)
break LOOP
case tm = <-ticker.C:
}
rules.Range(func(k, v interface{}) bool {
rule := v.(*Rule)
if !rule.preRun(tm) {
return true
}
wg.Add(1)
go func(rule *Rule) {
defer wg.Done()
defer rule.postRun()
rule.run(db)
}(rule)
return true
})
}
}
func doPushAlerts(alerts []string) {
defer wg.Done()
if len(utils.Cfg.Receivers.AlertManager) == 0 {
return
}
buf := bytes.Buffer{}
buf.WriteByte('[')
for i, alert := range alerts {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(alert)
}
buf.WriteByte(']')
log.Debug(buf.String())
resp, e := http.DefaultClient.Post(utils.Cfg.Receivers.AlertManager, "application/json", &buf)
if e != nil {
log.Errorf("failed to push alerts to downstream: %s", e.Error())
return
}
resp.Body.Close()
}
func pushAlerts() {
defer wg.Done()
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
alerts := make([]string, 0, batchSize)
LOOP:
for {
select {
case alert := <-chAlert:
if utils.Cfg.Receivers.Console {
fmt.Print(alert)
}
if len(alert) == 0 {
if len(alerts) > 0 {
wg.Add(1)
doPushAlerts(alerts)
}
break LOOP
}
if len(alerts) == batchSize {
wg.Add(1)
go doPushAlerts(alerts)
alerts = make([]string, 0, batchSize)
}
alerts = append(alerts, alert)
case <-ticker.C:
if len(alerts) > 0 {
wg.Add(1)
go doPushAlerts(alerts)
alerts = make([]string, 0, batchSize)
}
}
}
}
func loadRuleFromDatabase() error {
allRules, e := models.LoadAllRule()
if e != nil {
log.Error("failed to load rules from database:", e.Error())
return e
}
count := 0
for _, r := range allRules {
rule, e := newRule(r.Content)
if e != nil {
log.Errorf("[%s]: parse failed: %s", r.Name, e.Error())
continue
}
if !r.Enabled {
rule.setState(RuleStateDisabled)
}
rules.Store(rule.Name, rule)
count++
}
log.Infof("total %d rules loaded", count)
return nil
}
func loadRuleFromFile() error {
f, e := os.Open(utils.Cfg.RuleFile)
if e != nil {
log.Error("failed to load rules from file:", e.Error())
return e
}
defer f.Close()
var allRules []Rule
e = json.NewDecoder(f).Decode(&allRules)
if e != nil {
log.Error("failed to parse rule file:", e.Error())
return e
}
for i := 0; i < len(allRules); i++ {
rule := &allRules[i]
rules.Store(rule.Name, rule)
}
log.Infof("total %d rules loaded", len(allRules))
return nil
}
func initRule() error {
if len(utils.Cfg.Database) > 0 {
if e := loadRuleFromDatabase(); e != nil {
return e
}
} else {
if e := loadRuleFromFile(); e != nil {
return e
}
}
wg.Add(2)
go runRules()
go pushAlerts()
return nil
}
func uninitRule() error {
close(chStop)
wg.Wait()
return nil
}
package app
import (
"fmt"
"testing"
"github.com/taosdata/alert/utils/log"
)
func TestParseGroupBy(t *testing.T) {
cases := []struct {
sql string
cols []string
}{
{
sql: "select * from a",
cols: []string{},
},
{
sql: "select * from a group by abc",
cols: []string{"abc"},
},
{
sql: "select * from a group by abc, def",
cols: []string{"abc", "def"},
},
{
sql: "select * from a Group by abc, def order by abc",
cols: []string{"abc", "def"},
},
}
for _, c := range cases {
cols, e := parseGroupBy(c.sql)
if e != nil {
t.Errorf("failed to parse sql '%s': %s", c.sql, e.Error())
}
for i := range cols {
if i >= len(c.cols) {
t.Errorf("count of group by columns of '%s' is wrong", c.sql)
}
if c.cols[i] != cols[i] {
t.Errorf("wrong group by columns for '%s'", c.sql)
}
}
}
}
func TestManagement(t *testing.T) {
const format = `{"name":"rule%d", "sql":"select count(*) as count from meters", "expr":"count>2"}`
log.Init()
for i := 0; i < 5; i++ {
s := fmt.Sprintf(format, i)
rule, e := newRule(s)
if e != nil {
t.Errorf("failed to create rule: %s", e.Error())
}
e = doUpdateRule(rule, s)
if e != nil {
t.Errorf("failed to add or update rule: %s", e.Error())
}
}
for i := 0; i < 5; i++ {
name := fmt.Sprintf("rule%d", i)
if _, ok := rules.Load(name); !ok {
t.Errorf("rule '%s' does not exist", name)
}
}
name := "rule1"
if e := doDeleteRule(name); e != nil {
t.Errorf("failed to delete rule: %s", e.Error())
}
if _, ok := rules.Load(name); ok {
t.Errorf("rule '%s' should not exist any more", name)
}
}
{
"port": 8100,
"database": "file:alert.db",
"tdengine": "root:taosdata@/tcp(127.0.0.1:0)/",
"log": {
"level": "debug",
"path": ""
},
"receivers": {
"alertManager": "http://127.0.0.1:9093/api/v1/alerts",
"console": true
}
}
package main
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"time"
"github.com/taosdata/alert/app"
"github.com/taosdata/alert/models"
"github.com/taosdata/alert/utils"
"github.com/taosdata/alert/utils/log"
_ "github.com/mattn/go-sqlite3"
_ "github.com/taosdata/driver-go/taosSql"
)
type httpHandler struct {
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
path := r.URL.Path
http.DefaultServeMux.ServeHTTP(w, r)
duration := time.Now().Sub(start)
log.Debugf("[%s]\t%s\t%s", r.Method, path, duration)
}
func serveWeb() *http.Server {
log.Info("Listening at port: ", utils.Cfg.Port)
srv := &http.Server{
Addr: ":" + strconv.Itoa(int(utils.Cfg.Port)),
Handler: &httpHandler{},
}
go func() {
if e := srv.ListenAndServe(); e != nil {
log.Error(e.Error())
}
}()
return srv
}
func copyFile(dst, src string) error {
if dst == src {
return nil
}
in, e := os.Open(src)
if e != nil {
return e
}
defer in.Close()
out, e := os.Create(dst)
if e != nil {
return e
}
defer out.Close()
_, e = io.Copy(out, in)
return e
}
func doSetup(cfgPath string) error {
exePath, e := os.Executable()
if e != nil {
fmt.Fprintf(os.Stderr, "failed to get executable path: %s\n", e.Error())
return e
}
if !filepath.IsAbs(cfgPath) {
dir := filepath.Dir(exePath)
cfgPath = filepath.Join(dir, cfgPath)
}
e = copyFile("/etc/taos/alert.cfg", cfgPath)
if e != nil {
fmt.Fprintf(os.Stderr, "failed copy configuration file: %s\n", e.Error())
return e
}
f, e := os.Create("/etc/systemd/system/alert.service")
if e != nil {
fmt.Printf("failed to create alert service: %s\n", e.Error())
return e
}
defer f.Close()
const content = `[Unit]
Description=Alert (TDengine Alert Service)
After=syslog.target
After=network.target
[Service]
RestartSec=2s
Type=simple
WorkingDirectory=/var/lib/taos/
ExecStart=%s -cfg /etc/taos/alert.cfg
Restart=always
[Install]
WantedBy=multi-user.target
`
_, e = fmt.Fprintf(f, content, exePath)
if e != nil {
fmt.Printf("failed to create alert.service: %s\n", e.Error())
return e
}
return nil
}
const version = "TDengine alert v1.0.0"
func main() {
var (
cfgPath string
setup bool
showVersion bool
)
flag.StringVar(&cfgPath, "cfg", "alert.cfg", "path of configuration file")
flag.BoolVar(&setup, "setup", false, "setup the service as a daemon")
flag.BoolVar(&showVersion, "version", false, "show version information")
flag.Parse()
if showVersion {
fmt.Println(version)
return
}
if setup {
if runtime.GOOS == "linux" {
doSetup(cfgPath)
} else {
fmt.Fprintln(os.Stderr, "can only run as a daemon mode in linux.")
}
return
}
if e := utils.LoadConfig(cfgPath); e != nil {
fmt.Fprintln(os.Stderr, "failed to load configuration")
return
}
if e := log.Init(); e != nil {
fmt.Fprintln(os.Stderr, "failed to initialize logger:", e.Error())
return
}
defer log.Sync()
if e := models.Init(); e != nil {
log.Fatal("failed to initialize database:", e.Error())
}
if e := app.Init(); e != nil {
log.Fatal("failed to initialize application:", e.Error())
}
// start web server
srv := serveWeb()
// wait `Ctrl-C` or `Kill` to exit, `Kill` does not work on Windows
interrupt := make(chan os.Signal)
signal.Notify(interrupt, os.Interrupt)
signal.Notify(interrupt, os.Kill)
<-interrupt
fmt.Println("'Ctrl + C' received, exiting...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
srv.Shutdown(ctx)
cancel()
app.Uninit()
models.Uninit()
}
{
"name": "CarTooFast",
"period": "10s",
"sql": "select avg(speed) as avgspeed from test.cars where ts > now - 5m group by id",
"expr": "avgSpeed > 100",
"for": "0s",
"labels": {
"ruleName": "CarTooFast"
},
"annotations": {
"summary": "car {{$values.id}} is too fast, its average speed is {{$values.avgSpeed}}km/h"
}
}
\ No newline at end of file
module github.com/taosdata/alert
go 1.14
require (
github.com/jmoiron/sqlx v1.2.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/taosdata/driver-go v0.0.0-20200311072652-8c58c512b6ac
go.uber.org/zap v1.14.1
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
)
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/taosdata/driver-go v0.0.0-20200311072652-8c58c512b6ac h1:uZplMwObJj8mfgI4ZvYPNHRn+fNz2leiMPqShsjtEEc=
github.com/taosdata/driver-go v0.0.0-20200311072652-8c58c512b6ac/go.mod h1:TuMZDpnBrjNO07rneM2C5qMYFqIro4aupL2cUOGGo/I=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
package models
import (
"fmt"
"strconv"
"time"
"github.com/jmoiron/sqlx"
"github.com/taosdata/alert/utils"
"github.com/taosdata/alert/utils/log"
)
var db *sqlx.DB
func Init() error {
xdb, e := sqlx.Connect("sqlite3", utils.Cfg.Database)
if e == nil {
db = xdb
}
return upgrade()
}
func Uninit() error {
db.Close()
return nil
}
func getStringOption(tx *sqlx.Tx, name string) (string, error) {
const qs = "SELECT * FROM `option` WHERE `name`=?"
var (
e error
o struct {
Name string `db:"name"`
Value string `db:"value"`
}
)
if tx != nil {
e = tx.Get(&o, qs, name)
} else {
e = db.Get(&o, qs, name)
}
if e != nil {
return "", e
}
return o.Value, nil
}
func getIntOption(tx *sqlx.Tx, name string) (int, error) {
s, e := getStringOption(tx, name)
if e != nil {
return 0, e
}
v, e := strconv.ParseInt(s, 10, 64)
return int(v), e
}
func setOption(tx *sqlx.Tx, name string, value interface{}) error {
const qs = "REPLACE INTO `option`(`name`, `value`) VALUES(?, ?);"
var (
e error
sv string
)
switch v := value.(type) {
case time.Time:
sv = v.Format(time.RFC3339)
default:
sv = fmt.Sprint(value)
}
if tx != nil {
_, e = tx.Exec(qs, name, sv)
} else {
_, e = db.Exec(qs, name, sv)
}
return e
}
var upgradeScripts = []struct {
ver int
stmts []string
}{
{
ver: 0,
stmts: []string{
"CREATE TABLE `option`( `name` VARCHAR(63) PRIMARY KEY, `value` VARCHAR(255) NOT NULL) WITHOUT ROWID;",
"CREATE TABLE `rule`( `name` VARCHAR(63) PRIMARY KEY, `enabled` TINYINT(1) NOT NULL, `created_at` DATETIME NOT NULL, `updated_at` DATETIME NOT NULL, `content` TEXT(65535) NOT NULL);",
},
},
}
func upgrade() error {
const dbVersion = "database version"
ver, e := getIntOption(nil, dbVersion)
if e != nil { // regards all errors as schema not created
ver = -1 // set ver to -1 to execute all statements
}
tx, e := db.Beginx()
if e != nil {
return e
}
for _, us := range upgradeScripts {
if us.ver <= ver {
continue
}
log.Info("upgrading database to version: ", us.ver)
for _, s := range us.stmts {
if _, e = tx.Exec(s); e != nil {
tx.Rollback()
return e
}
}
ver = us.ver
}
if e = setOption(tx, dbVersion, ver); e != nil {
tx.Rollback()
return e
}
return tx.Commit()
}
package models
import "time"
const (
sqlSelectAllRule = "SELECT * FROM `rule`;"
sqlSelectRule = "SELECT * FROM `rule` WHERE `name` = ?;"
sqlInsertRule = "INSERT INTO `rule`(`name`, `enabled`, `created_at`, `updated_at`, `content`) VALUES(:name, :enabled, :created_at, :updated_at, :content);"
sqlUpdateRule = "UPDATE `rule` SET `content` = :content, `updated_at` = :updated_at WHERE `name` = :name;"
sqlEnableRule = "UPDATE `rule` SET `enabled` = :enabled, `updated_at` = :updated_at WHERE `name` = :name;"
sqlDeleteRule = "DELETE FROM `rule` WHERE `name` = ?;"
)
type Rule struct {
Name string `db:"name"`
Enabled bool `db:"enabled"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
Content string `db:"content"`
}
func AddRule(r *Rule) error {
r.CreatedAt = time.Now()
r.Enabled = true
r.UpdatedAt = r.CreatedAt
_, e := db.NamedExec(sqlInsertRule, r)
return e
}
func UpdateRule(name string, content string) error {
r := Rule{
Name: name,
UpdatedAt: time.Now(),
Content: content,
}
_, e := db.NamedExec(sqlUpdateRule, &r)
return e
}
func EnableRule(name string, enabled bool) error {
r := Rule{
Name: name,
Enabled: enabled,
UpdatedAt: time.Now(),
}
if res, e := db.NamedExec(sqlEnableRule, &r); e != nil {
return e
} else if n, e := res.RowsAffected(); n != 1 {
return e
}
return nil
}
func DeleteRule(name string) error {
_, e := db.Exec(sqlDeleteRule, name)
return e
}
func GetRuleByName(name string) (*Rule, error) {
r := Rule{}
if e := db.Get(&r, sqlSelectRule, name); e != nil {
return nil, e
}
return &r, nil
}
func LoadAllRule() ([]Rule, error) {
var rules []Rule
if e := db.Select(&rules, sqlSelectAllRule); e != nil {
return nil, e
}
return rules, nil
}
set -e
# releash.sh -c [arm | arm64 | amd64 | 386]
# -o [linux | darwin | windows]
# set parameters by default value
cpuType=amd64 # [arm | arm64 | amd64 | 386]
osType=linux # [linux | darwin | windows]
while getopts "h:c:o:" arg
do
case $arg in
c)
#echo "cpuType=$OPTARG"
cpuType=$(echo $OPTARG)
;;
o)
#echo "osType=$OPTARG"
osType=$(echo $OPTARG)
;;
h)
echo "Usage: `basename $0` -c [arm | arm64 | amd64 | 386] -o [linux | darwin | windows]"
exit 0
;;
?) #unknown option
echo "unknown argument"
exit 1
;;
esac
done
startdir=$(pwd)
scriptdir=$(dirname $(readlink -f $0))
cd ${scriptdir}/cmd/alert
version=$(grep 'const version =' main.go | awk '{print $NF}')
version=${version%\"}
echo "cpuType=${cpuType}"
echo "osType=${osType}"
echo "version=${version}"
GOOS=${osType} GOARCH=${cpuType} go build
GZIP=-9 tar -zcf ${startdir}/alert-${version}-${osType}-${cpuType}.tar.gz alert alert.cfg
sql connect
sleep 100
sql drop database if exists test
sql create database test
sql use test
print ====== create super table
sql create table cars (ts timestamp, speed int) tags(id int)
print ====== create tables
$i = 0
while $i < 5
$tb = car . $i
sql create table $tb using cars tags( $i )
$i = $i + 1
endw
{
"name": "test1",
"period": "10s",
"sql": "select avg(speed) as avgspeed from test.cars group by id",
"expr": "avgSpeed >= 3",
"for": "0s",
"labels": {
"ruleName": "test1"
},
"annotations": {
"summary": "speed of car(id = {{$labels.id}}) is too high: {{$values.avgSpeed}}"
}
}
\ No newline at end of file
sql connect
sleep 100
print ====== insert 10 records to table 0
$i = 10
while $i > 0
$ms = $i . s
sql insert into test.car0 values(now - $ms , 1)
$i = $i - 1
endw
sql connect
sleep 100
print ====== insert another records table 0
sql insert into test.car0 values(now , 100)
sql connect
sleep 100
print ====== insert 10 records to table 0, 1, 2
$i = 10
while $i > 0
$ms = $i . s
sql insert into test.car0 values(now - $ms , 1)
sql insert into test.car1 values(now - $ms , $i )
sql insert into test.car2 values(now - $ms , 10)
$i = $i - 1
endw
# wait until $1 alerts are generates, and at most wait $2 seconds
# return 0 if wait succeeded, 1 if wait timeout
function waitAlert() {
local i=0
while [ $i -lt $2 ]; do
local c=$(wc -l alert.out | awk '{print $1}')
if [ $c -ge $1 ]; then
return 0
fi
let "i=$i+1"
sleep 1s
done
return 1
}
# prepare environment
kill -INT `ps aux | grep 'alert -cfg' | grep -v grep | awk '{print $2}'`
rm -f alert.db
rm -f alert.out
../cmd/alert/alert -cfg ../cmd/alert/alert.cfg > alert.out &
../../td/debug/build/bin/tsim -c /etc/taos -f ./prepare.sim
# add a rule to alert application
curl -d '@rule.json' http://localhost:8100/api/update-rule
# step 1: add some data but not trigger an alert
../../td/debug/build/bin/tsim -c /etc/taos -f ./step1.sim
# wait 20 seconds, should not get an alert
waitAlert 1 20
res=$?
if [ $res -eq 0 ]; then
echo 'should not have alerts here'
exit 1
fi
# step 2: trigger an alert
../../td/debug/build/bin/tsim -c /etc/taos -f ./step2.sim
# wait 30 seconds for the alert
waitAlert 1 30
res=$?
if [ $res -eq 1 ]; then
echo 'there should be an alert now'
exit 1
fi
# compare whether the generate alert meet expectation
diff <(uniq alert.out | sed -n 1p | jq -cS 'del(.startsAt, .endsAt)') <(jq -cSn '{"values":{"avgspeed":10,"id":0},"labels":{"id":"0","ruleName":"test1"},"annotations":{"summary":"speed of car(id = 0) is too high: 10"}}')
if [ $? -ne 0 ]; then
echo 'the generated alert does not meet expectation'
exit 1
fi
# step 3: add more data, trigger another 3 alerts
../../td/debug/build/bin/tsim -c /etc/taos -f ./step3.sim
# wait 30 seconds for the alerts
waitAlert 4 30
res=$?
if [ $res -eq 1 ]; then
echo 'there should be 4 alerts now'
exit 1
fi
# compare whether the generate alert meet expectation
diff <(uniq alert.out | sed -n 2p | jq -cS 'del(.startsAt, .endsAt)') <(jq -cSn '{"annotations":{"summary":"speed of car(id = 0) is too high: 5.714285714285714"},"labels":{"id":"0","ruleName":"test1"},"values":{"avgspeed":5.714285714285714,"id":0}}')
if [ $? -ne 0 ]; then
echo 'the generated alert does not meet expectation'
exit 1
fi
diff <(uniq alert.out | sed -n 3p | jq -cS 'del(.startsAt, .endsAt)') <(jq -cSn '{"annotations":{"summary":"speed of car(id = 1) is too high: 5.5"},"labels":{"id":"1","ruleName":"test1"},"values":{"avgspeed":5.5,"id":1}}')
if [ $? -ne 0 ]; then
echo 'the generated alert does not meet expectation'
exit 1
fi
diff <(uniq alert.out | sed -n 4p | jq -cS 'del(.startsAt, .endsAt)') <(jq -cSn '{"annotations":{"summary":"speed of car(id = 2) is too high: 10"},"labels":{"id":"2","ruleName":"test1"},"values":{"avgspeed":10,"id":2}}')
if [ $? -ne 0 ]; then
echo 'the generated alert does not meet expectation'
exit 1
fi
kill -INT `ps aux | grep 'alert -cfg' | grep -v grep | awk '{print $2}'`
package utils
import (
"encoding/json"
"os"
"gopkg.in/yaml.v3"
)
type Config struct {
Port uint16 `json:"port,omitempty" yaml:"port,omitempty"`
Database string `json:"database,omitempty" yaml:"database,omitempty"`
RuleFile string `json:"ruleFile,omitempty" yaml:"ruleFile,omitempty"`
Log struct {
Level string `json:"level,omitempty" yaml:"level,omitempty"`
Path string `json:"path,omitempty" yaml:"path,omitempty"`
} `json:"log" yaml:"log"`
TDengine string `json:"tdengine,omitempty" yaml:"tdengine,omitempty"`
Receivers struct {
AlertManager string `json:"alertManager,omitempty" yaml:"alertManager,omitempty"`
Console bool `json:"console"`
} `json:"receivers" yaml:"receivers"`
}
var Cfg Config
func LoadConfig(path string) error {
f, e := os.Open(path)
if e != nil {
return e
}
defer f.Close()
e = yaml.NewDecoder(f).Decode(&Cfg)
if e != nil {
f.Seek(0, 0)
e = json.NewDecoder(f).Decode(&Cfg)
}
return e
}
package log
import (
"github.com/taosdata/alert/utils"
"go.uber.org/zap"
)
var logger *zap.SugaredLogger
func Init() error {
var cfg zap.Config
if utils.Cfg.Log.Level == "debug" {
cfg = zap.NewDevelopmentConfig()
} else {
cfg = zap.NewProductionConfig()
}
if len(utils.Cfg.Log.Path) > 0 {
cfg.OutputPaths = []string{utils.Cfg.Log.Path}
}
l, e := cfg.Build()
if e != nil {
return e
}
logger = l.Sugar()
return nil
}
// Debug package logger
func Debug(args ...interface{}) {
logger.Debug(args...)
}
// Debugf package logger
func Debugf(template string, args ...interface{}) {
logger.Debugf(template, args...)
}
// Info package logger
func Info(args ...interface{}) {
logger.Info(args...)
}
// Infof package logger
func Infof(template string, args ...interface{}) {
logger.Infof(template, args...)
}
// Warn package logger
func Warn(args ...interface{}) {
logger.Warn(args...)
}
// Warnf package logger
func Warnf(template string, args ...interface{}) {
logger.Warnf(template, args...)
}
// Error package logger
func Error(args ...interface{}) {
logger.Error(args...)
}
// Errorf package logger
func Errorf(template string, args ...interface{}) {
logger.Errorf(template, args...)
}
// Fatal package logger
func Fatal(args ...interface{}) {
logger.Fatal(args...)
}
// Fatalf package logger
func Fatalf(template string, args ...interface{}) {
logger.Fatalf(template, args...)
}
// Panic package logger
func Panic(args ...interface{}) {
logger.Panic(args...)
}
// Panicf package logger
func Panicf(template string, args ...interface{}) {
logger.Panicf(template, args...)
}
func Sync() error {
return logger.Sync()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册