aggregator_test.go 2.5 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
package aggregate

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
	"gopkg.in/yaml.v3"

	"github.com/oceanbase/obagent/monitor/message"
)

func newTestMetric() *message.Message {
	name := "test"
	metricType := message.Gauge
	msg := message.NewMessage(name, metricType, time.Now()).
		AddTag("k1", "v1").
		AddTag("k2", "v2").
		AddField("f1", 1.0).
		AddField("f2", 2.0)
	return msg
}

func newTestMetrics(count int) []*message.Message {
	var metrics []*message.Message
	for i := 0; i < count; i++ {
		metricEntry := newTestMetric()
		metrics = append(metrics, metricEntry)
	}
	return metrics
}

func TestDuplicate(t *testing.T) {
	metrics := newTestMetrics(2)
	configStr := `
        rules:
          - metric: test
            tags: [ k1 ]
    `
	var aggregatorConfigMap map[string]interface{}
	_ = yaml.Unmarshal([]byte(configStr), &aggregatorConfigMap)

	aggregator := &Aggregator{}
	aggregator.Init(context.Background(), aggregatorConfigMap)

	metricsProcessed, _ := aggregator.Process(context.Background(), metrics...)
	require.Equal(t, 1, len(metricsProcessed))
	v, exists := metricsProcessed[0].GetField("f1")
	require.True(t, exists)
	f, ok := v.(float64)
	require.True(t, ok)
	require.Equal(t, 1.0, f)
}

func TestMatch(t *testing.T) {
	metrics := newTestMetrics(2)
	metrics[1].SetTag("k2", "vvvvv")
	configStr := `
        rules:
          - metric: test
            tags: [ k1 ]
    `
	var aggregatorConfigMap map[string]interface{}
	_ = yaml.Unmarshal([]byte(configStr), &aggregatorConfigMap)

	aggregator := &Aggregator{}
	aggregator.Init(context.Background(), aggregatorConfigMap)

	metricsProcessed, _ := aggregator.Process(context.Background(), metrics...)
	require.Equal(t, 1, len(metricsProcessed))
	v, exists := metricsProcessed[0].GetField("f1")
	require.True(t, exists)
	f, ok := v.(float64)
	require.True(t, ok)
	require.Equal(t, 2.0, f)
}

func TestNotMatch(t *testing.T) {
	t.Skip()
	metrics := newTestMetrics(2)
	metrics[1].SetTag("k2", "vvvvv")
	configStr := `
        rules:
          - metric: test
            tags: [ t1 ]
    `
	var aggregatorConfigMap map[string]interface{}
	_ = yaml.Unmarshal([]byte(configStr), &aggregatorConfigMap)

	aggregator := &Aggregator{}
	aggregator.Init(context.Background(), aggregatorConfigMap)

	metricsProcessed, _ := aggregator.Process(context.Background(), metrics...)
	require.Equal(t, 2, len(metricsProcessed))
	v, exists := metricsProcessed[0].GetField("f1")
	require.True(t, exists)
	f, ok := v.(float64)
	require.True(t, ok)
	require.Equal(t, 1.0, f)
}