提交 9f8feb9f 编写于 作者: F Fabian Reinartz

discovery: consolidate Marathon SD files

上级 25775b35
......@@ -16,81 +16,16 @@ package discovery
import (
"time"
"github.com/prometheus/common/log"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
// MarathonDiscovery provides service discovery based on a Marathon instance.
type MarathonDiscovery struct {
servers []string
refreshInterval time.Duration
done chan struct{}
lastRefresh map[string]*config.TargetGroup
client marathon.AppListClient
}
// NewMarathonDiscovery creates a new Marathon based discovery.
func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
return &MarathonDiscovery{
servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
done: make(chan struct{}),
client: marathon.FetchMarathonApps,
}
}
// Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case <-time.After(md.refreshInterval):
err := md.updateServices(ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
}
}
}
}
func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {
return err
// NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery {
return &marathon.Discovery{
Servers: conf.Servers,
RefreshInterval: time.Duration(conf.RefreshInterval),
Done: make(chan struct{}),
Client: marathon.FetchApps,
}
all := make([]*config.TargetGroup, 0, len(targetMap))
for _, tg := range targetMap {
all = append(all, tg)
}
ch <- all
// Remove services which did disappear
for source := range md.lastRefresh {
_, ok := targetMap[source]
if !ok {
log.Debugf("Removing group for %s", source)
ch <- []*config.TargetGroup{{Source: source}}
}
}
md.lastRefresh = targetMap
return nil
}
func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := marathon.RandomAppsURL(md.servers)
apps, err := md.client(url)
if err != nil {
return nil, err
}
groups := marathon.AppsToTargetGroups(apps)
return groups, nil
}
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"encoding/json"
"io/ioutil"
"net/http"
)
// AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error)
// FetchMarathonApps requests a list of applications from a marathon server.
func FetchMarathonApps(url string) (*AppList, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return parseAppJSON(body)
}
func parseAppJSON(body []byte) (*AppList, error) {
apps := &AppList{}
err := json.Unmarshal(body, apps)
if err != nil {
return nil, err
}
return apps, nil
}
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"github.com/prometheus/common/model"
)
const (
// metaLabelPrefix is the meta prefix used for all meta labels in this discovery.
metaLabelPrefix = model.MetaLabelPrefix + "marathon_"
// appLabelPrefix is the prefix for the application labels.
appLabelPrefix = metaLabelPrefix + "app_label_"
// appLabel is used for the name of the app in Marathon.
appLabel model.LabelName = metaLabelPrefix + "app"
// imageLabel is the label that is used for the docker image running the service.
imageLabel model.LabelName = metaLabelPrefix + "image"
// taskLabel contains the mesos task name of the app instance.
taskLabel model.LabelName = metaLabelPrefix + "task"
)
// Copyright 2015 The Prometheus Authors
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
......@@ -14,13 +14,164 @@
package marathon
import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
const (
// metaLabelPrefix is the meta prefix used for all meta labels in this discovery.
metaLabelPrefix = model.MetaLabelPrefix + "marathon_"
// appLabelPrefix is the prefix for the application labels.
appLabelPrefix = metaLabelPrefix + "app_label_"
// appLabel is used for the name of the app in Marathon.
appLabel model.LabelName = metaLabelPrefix + "app"
// imageLabel is the label that is used for the docker image running the service.
imageLabel model.LabelName = metaLabelPrefix + "image"
// taskLabel contains the mesos task name of the app instance.
taskLabel model.LabelName = metaLabelPrefix + "task"
)
const appListPath string = "/v2/apps/?embed=apps.tasks"
// Discovery provides service discovery based on a Marathon instance.
type Discovery struct {
Servers []string
RefreshInterval time.Duration
Done chan struct{}
lastRefresh map[string]*config.TargetGroup
Client AppListClient
}
// Run implements the TargetProvider interface.
func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case <-time.After(md.RefreshInterval):
err := md.updateServices(ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
}
}
}
}
func (md *Discovery) updateServices(ch chan<- []*config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {
return err
}
all := make([]*config.TargetGroup, 0, len(targetMap))
for _, tg := range targetMap {
all = append(all, tg)
}
ch <- all
// Remove services which did disappear
for source := range md.lastRefresh {
_, ok := targetMap[source]
if !ok {
log.Debugf("Removing group for %s", source)
ch <- []*config.TargetGroup{{Source: source}}
}
}
md.lastRefresh = targetMap
return nil
}
func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := RandomAppsURL(md.Servers)
apps, err := md.Client(url)
if err != nil {
return nil, err
}
groups := AppsToTargetGroups(apps)
return groups, nil
}
// Task describes one instance of a service running on Marathon.
type Task struct {
ID string `json:"id"`
Host string `json:"host"`
Ports []uint32 `json:"ports"`
}
// DockerContainer describes a container which uses the docker runtime.
type DockerContainer struct {
Image string `json:"image"`
}
// Container describes the runtime an app in running in.
type Container struct {
Docker DockerContainer `json:"docker"`
}
// App describes a service running on Marathon.
type App struct {
ID string `json:"id"`
Tasks []Task `json:"tasks"`
RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"`
Container Container `json:"container"`
}
// AppList is a list of Marathon apps.
type AppList struct {
Apps []App `json:"apps"`
}
// AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error)
// FetchApps requests a list of applications from a marathon server.
func FetchApps(url string) (*AppList, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return parseAppJSON(body)
}
func parseAppJSON(body []byte) (*AppList, error) {
apps := &AppList{}
err := json.Unmarshal(body, apps)
if err != nil {
return nil, err
}
return apps, nil
}
// RandomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups.
func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup {
tgroups := map[string]*config.TargetGroup{}
......
......@@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
package marathon
import (
"errors"
......@@ -22,23 +22,22 @@ import (
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) {
func newTestDiscovery(client AppListClient) (chan []*config.TargetGroup, *Discovery) {
ch := make(chan []*config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{
md := &Discovery{
Servers: []string{"http://localhost:8080"},
})
md.client = client
}
md.Client = client
return ch, md
}
func TestMarathonSDHandleError(t *testing.T) {
var errTesting = errors.New("testing failure")
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
return nil, errTesting
})
go func() {
......@@ -55,8 +54,8 @@ func TestMarathonSDHandleError(t *testing.T) {
}
func TestMarathonSDEmptyList(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return &marathon.AppList{}, nil
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
return &AppList{}, nil
})
go func() {
select {
......@@ -73,28 +72,28 @@ func TestMarathonSDEmptyList(t *testing.T) {
}
}
func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList {
task := marathon.Task{
func marathonTestAppList(labels map[string]string, runningTasks int) *AppList {
task := Task{
ID: "test-task-1",
Host: "mesos-slave1",
Ports: []uint32{31000},
}
docker := marathon.DockerContainer{Image: "repo/image:tag"}
container := marathon.Container{Docker: docker}
app := marathon.App{
docker := DockerContainer{Image: "repo/image:tag"}
container := Container{Docker: docker}
app := App{
ID: "test-service",
Tasks: []marathon.Task{task},
Tasks: []Task{task},
RunningTasks: runningTasks,
Labels: labels,
Container: container,
}
return &marathon.AppList{
Apps: []marathon.App{app},
return &AppList{
Apps: []App{app},
}
}
func TestMarathonSDSendGroup(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
go func() {
......@@ -123,7 +122,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
}
func TestMarathonSDRemoveApp(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
......@@ -142,7 +141,7 @@ func TestMarathonSDRemoveApp(t *testing.T) {
t.Fatalf("Got error on first update: %s", err)
}
md.client = func(url string) (*marathon.AppList, error) {
md.Client = func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil
}
err = md.updateServices(ch)
......@@ -152,17 +151,17 @@ func TestMarathonSDRemoveApp(t *testing.T) {
}
func TestMarathonSDRunAndStop(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
md.refreshInterval = time.Millisecond * 10
md.RefreshInterval = time.Millisecond * 10
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ch:
cancel()
case <-time.After(md.refreshInterval * 3):
case <-time.After(md.RefreshInterval * 3):
cancel()
t.Fatalf("Update took too long.")
}
......
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
// Task describes one instance of a service running on Marathon.
type Task struct {
ID string `json:"id"`
Host string `json:"host"`
Ports []uint32 `json:"ports"`
}
// DockerContainer describes a container which uses the docker runtime.
type DockerContainer struct {
Image string `json:"image"`
}
// Container describes the runtime an app in running in.
type Container struct {
Docker DockerContainer `json:"docker"`
}
// App describes a service running on Marathon.
type App struct {
ID string `json:"id"`
Tasks []Task `json:"tasks"`
RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"`
Container Container `json:"container"`
}
// AppList is a list of Marathon apps.
type AppList struct {
Apps []App `json:"apps"`
}
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"fmt"
"math/rand"
)
const appListPath string = "/v2/apps/?embed=apps.tasks"
// RandomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}
......@@ -374,7 +374,7 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
app("marathon", i, discovery.NewMarathonDiscovery(c))
app("marathon", i, discovery.NewMarathon(c))
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := discovery.NewKubernetesDiscovery(c)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册