提交 674c76ad 编写于 作者: S Simon Pasquier 提交者: Brian Brazil

discovery: coalesce identical SD configurations (#3912)

* discovery: coalesce identical SD configurations

Instead of creating as many SD providers as declared in the
configuration, the discovery manager merges identical configurations
into the same provider and keeps track of the subscribers. When
the manager receives target updates from a SD provider, it will
broadcast the updates to all interested subscribers.
Signed-off-by: NSimon Pasquier <spasquie@redhat.com>
上级 75bd3481
......@@ -16,11 +16,13 @@ package discovery
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
......@@ -38,6 +40,19 @@ import (
"github.com/prometheus/prometheus/discovery/zookeeper"
)
var (
failedConfigs = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_sd_configs_failed_total",
Help: "Total number of service discovery configurations that failed to load.",
},
)
)
func init() {
prometheus.MustRegister(failedConfigs)
}
// Discoverer provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a discovery provider
// detects a potential change, it sends the TargetGroup through its channel.
......@@ -59,8 +74,19 @@ type poolKey struct {
provider string
}
// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
name string
d Discoverer
subs []string
config interface{}
}
// NewManager is the Discovery Manager constructor
func NewManager(ctx context.Context, logger log.Logger) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
logger: logger,
syncCh: make(chan map[string][]*targetgroup.Group),
......@@ -77,9 +103,12 @@ type Manager struct {
mtx sync.RWMutex
ctx context.Context
discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group
// providers keeps track of SD providers.
providers []*provider
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group
}
......@@ -105,9 +134,10 @@ func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) e
m.cancelDiscoverers()
for name, scfg := range cfg {
for provName, prov := range m.providersFromConfig(scfg) {
m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov)
}
m.registerProviders(scfg, name)
}
for _, prov := range m.providers {
m.startProvider(m.ctx, prov)
}
return nil
......@@ -115,22 +145,27 @@ func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) e
// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) {
// Pool key for non-standard SD implementations are unknown.
poolKey := poolKey{setName: name, provider: name}
m.startProvider(ctx, poolKey, worker)
p := &provider{
name: name,
d: worker,
subs: []string{name},
}
m.providers = append(m.providers, p)
m.startProvider(ctx, p)
}
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel)
go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates)
go p.d.Run(ctx, updates)
go m.runProvider(ctx, p, updates)
}
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) {
func (m *Manager) runProvider(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
......@@ -152,7 +187,9 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan
}
return
}
m.updateGroup(poolKey, tgs)
for _, s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}
// Signal that there was an update.
select {
......@@ -178,6 +215,7 @@ func (m *Manager) cancelDiscoverers() {
c()
}
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.providers = nil
m.discoverCancel = nil
}
......@@ -210,85 +248,97 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group {
return tSets
}
func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer {
providers := map[string]Discoverer{}
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) {
add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
t := reflect.TypeOf(cfg).String()
for _, p := range m.providers {
if reflect.DeepEqual(cfg, p.config) {
p.subs = append(p.subs, setName)
return
}
}
d, err := newDiscoverer()
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", t)
failedConfigs.Inc()
return
}
app := func(mech string, i int, tp Discoverer) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
provider := provider{
name: fmt.Sprintf("%s/%d", t, len(m.providers)),
d: d,
config: cfg,
subs: []string{setName},
}
m.providers = append(m.providers, &provider)
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")))
for _, c := range cfg.DNSSDConfigs {
add(c, func() (Discoverer, error) {
return dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")), nil
})
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file")))
for _, c := range cfg.FileSDConfigs {
add(c, func() (Discoverer, error) {
return file.NewDiscovery(c, log.With(m.logger, "discovery", "file")), nil
})
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
for _, c := range cfg.ConsulSDConfigs {
add(c, func() (Discoverer, error) {
return consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul"))
})
}
for i, c := range cfg.MarathonSDConfigs {
t, err := marathon.NewDiscovery(*c, log.With(m.logger, "discovery", "marathon"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, t)
for _, c := range cfg.MarathonSDConfigs {
add(c, func() (Discoverer, error) {
return marathon.NewDiscovery(*c, log.With(m.logger, "discovery", "marathon"))
})
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
for _, c := range cfg.KubernetesSDConfigs {
add(c, func() (Discoverer, error) {
return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
})
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")))
for _, c := range cfg.ServersetSDConfigs {
add(c, func() (Discoverer, error) {
return zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")), nil
})
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")))
for _, c := range cfg.NerveSDConfigs {
add(c, func() (Discoverer, error) {
return zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")), nil
})
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")))
for _, c := range cfg.EC2SDConfigs {
add(c, func() (Discoverer, error) {
return ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")), nil
})
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
for _, c := range cfg.OpenstackSDConfigs {
add(c, func() (Discoverer, error) {
return openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack"))
})
}
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(*c, log.With(m.logger, "discovery", "gce"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
for _, c := range cfg.GCESDConfigs {
add(c, func() (Discoverer, error) {
return gce.NewDiscovery(*c, log.With(m.logger, "discovery", "gce"))
})
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")))
for _, c := range cfg.AzureSDConfigs {
add(c, func() (Discoverer, error) {
return azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")), nil
})
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With(m.logger, "discovery", "trition"), c)
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)
for _, c := range cfg.TritonSDConfigs {
add(c, func() (Discoverer, error) {
return triton.New(log.With(m.logger, "discovery", "triton"), c)
})
}
if len(cfg.StaticConfigs) > 0 {
app("static", 0, &StaticProvider{cfg.StaticConfigs})
add(setName, func() (Discoverer, error) {
return &StaticProvider{cfg.StaticConfigs}, nil
})
}
return providers
}
// StaticProvider holds a list of target groups that never change.
......
......@@ -16,6 +16,8 @@ package discovery
import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"sort"
"strconv"
......@@ -32,7 +34,7 @@ import (
// TestTargetUpdatesOrder checks that the target updates are received in the expected order.
func TestTargetUpdatesOrder(t *testing.T) {
// The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter
// The order by which the updates are send is determined by the interval passed to the mock discovery adapter
// Final targets array is ordered alphabetically by the name of the discoverer.
// For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
testCases := []struct {
......@@ -699,34 +701,34 @@ func TestTargetUpdatesOrder(t *testing.T) {
}
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
match := false
var mergedTargets string
for _, targetGroup := range tSets[poolKey] {
match := false
var mergedTargets string
for _, targetGroup := range tSets[poolKey] {
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets)
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets)
}
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
cfg := &config.Config{}
sOne := `
......@@ -751,8 +753,8 @@ scrape_configs:
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", true)
sTwo := `
scrape_configs:
......@@ -770,8 +772,60 @@ scrape_configs:
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false)
}
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "sd")
if err != nil {
t.Fatalf("error creating temporary file: %v", err)
}
defer os.Remove(tmpFile.Name())
if _, err := tmpFile.Write([]byte(`[{"targets": ["foo:9090"]}]`)); err != nil {
t.Fatalf("error writing temporary file: %v", err)
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("error closing temporary file: %v", err)
}
tmpFile2 := fmt.Sprintf("%s.json", tmpFile.Name())
if err = os.Link(tmpFile.Name(), tmpFile2); err != nil {
t.Fatalf("error linking temporary file: %v", err)
}
defer os.Remove(tmpFile2)
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
file_sd_configs:
- files: ["%s"]
- job_name: 'prometheus2'
file_sd_configs:
- files: ["%s"]
`
sOne = fmt.Sprintf(sOne, tmpFile2, tmpFile2)
if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, nil)
go discoveryManager.Run()
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true)
if len(discoveryManager.providers) != 1 {
t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers))
}
}
func TestApplyConfigDoesNotModifyStaticProviderTargets(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册