diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index 8b45456a56d4619c1b31d5d505d61ed3dff2e7e9..c5c73c89494dbb3364b824dfc952cbd580d8f767 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -114,3 +114,35 @@ scrape_configs: target_label: kubernetes_namespace - source_labels: [__meta_kubernetes_service_name] target_label: kubernetes_name + +# Example scrape config for pods +# +# The relabeling allows the actual pod scrape endpoint to be configured via the +# following annotations: +# +# * `prometheus.io/scrape`: Only scrape pods that have a value of `true` +# * `prometheus.io/port`: Scrape the pod on the indicated port instead of the default of `9102`. +- job_name: 'kubernetes-pods' + + kubernetes_sd_configs: + - api_servers: + - 'https://kubernetes.default.svc' + in_cluster: true + + relabel_configs: + - source_labels: [__meta_kubernetes_role, __meta_kubernetes_pod_annotation_prometheus_io_scrape] + action: keep + regex: pod;true + - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port] + action: replace + regex: (.+):(?:\d+);(\d+) + replacement: ${1]:${2} + target_label: __address__ + - action: labelmap + regex: __meta_kubernetes_pod_label_(.+) + - source_labels: [__meta_kubernetes_pod_namespace] + action: replace + target_label: kubernetes_namespace + - source_labels: [__meta_kubernetes_pod_name] + action: replace + target_label: kubernetes_pod_name diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 4f639c50c42ac4bf1fa3862a2f3952e34a8a479e..cb05250fcd21d1d30652b209d95ed0feca4a8a55 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,12 +14,16 @@ package kubernetes import ( + "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" + "sort" + "strconv" + "strings" "sync" "time" @@ -33,27 +37,55 @@ import ( ) const ( - sourceServicePrefix = "services" - // kubernetesMetaLabelPrefix is the meta prefix used for all meta labels. // in this discovery. metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" + + // roleLabel is the name for the label containing a target's role. + roleLabel = metaLabelPrefix + "role" + + sourcePodPrefix = "pods" + // podsTargetGroupNAme is the name given to the target group for pods + podsTargetGroupName = "pods" + // podNamespaceLabel is the name for the label containing a target pod's namespace + podNamespaceLabel = metaLabelPrefix + "pod_namespace" + // podNameLabel is the name for the label containing a target pod's name + podNameLabel = metaLabelPrefix + "pod_name" + // podAddressLabel is the name for the label containing a target pod's IP address (the PodIP) + podAddressLabel = metaLabelPrefix + "pod_address" + // podContainerNameLabel is the name for the label containing a target's container name + podContainerNameLabel = metaLabelPrefix + "pod_container_name" + // podContainerPortNameLabel is the name for the label containing the name of the port selected for a target + podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + // PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container + podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list" + // PodContainerPortMapPrefix is the prefix used to create the names of labels that associate container port names to port values + // Such labels will be named (podContainerPortMapPrefix)_(PortName) = (ContainerPort) + podContainerPortMapPrefix = metaLabelPrefix + "pod_container_port_map_" + // podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target + podReadyLabel = metaLabelPrefix + "pod_ready" + // podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod + podLabelPrefix = metaLabelPrefix + "pod_label_" + // podAnnotationPrefix is the prefix for prom label names corresponding to k8s annotations for a target pod + podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" + + sourceServicePrefix = "services" // serviceNamespaceLabel is the name for the label containing a target's service namespace. serviceNamespaceLabel = metaLabelPrefix + "service_namespace" // serviceNameLabel is the name for the label containing a target's service name. serviceNameLabel = metaLabelPrefix + "service_name" - // nodeLabelPrefix is the prefix for the node labels. - nodeLabelPrefix = metaLabelPrefix + "node_label_" // serviceLabelPrefix is the prefix for the service labels. serviceLabelPrefix = metaLabelPrefix + "service_label_" // serviceAnnotationPrefix is the prefix for the service annotations. serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" + // nodesTargetGroupName is the name given to the target group for nodes. nodesTargetGroupName = "nodes" + // nodeLabelPrefix is the prefix for the node labels. + nodeLabelPrefix = metaLabelPrefix + "node_label_" + // apiServersTargetGroupName is the name given to the target group for API servers. apiServersTargetGroupName = "apiServers" - // roleLabel is the name for the label containing a target's role. - roleLabel = metaLabelPrefix + "role" serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" @@ -61,6 +93,7 @@ const ( apiVersion = "v1" apiPrefix = "/api/" + apiVersion nodesURL = apiPrefix + "/nodes" + podsURL = apiPrefix + "/pods" servicesURL = apiPrefix + "/services" endpointsURL = apiPrefix + "/endpoints" serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s" @@ -75,9 +108,12 @@ type Discovery struct { apiServersMu sync.RWMutex nodes map[string]*Node services map[string]map[string]*Service - nodesMu sync.RWMutex - servicesMu sync.RWMutex - runDone chan struct{} + // map of namespace to (map of pod name to pod) + pods map[string]map[string]*Pod + nodesMu sync.RWMutex + servicesMu sync.RWMutex + podsMu sync.RWMutex + runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -97,6 +133,7 @@ func (kd *Discovery) Initialize() error { // Run implements the TargetProvider interface. func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + log.Debugf("Kubernetes Discovery.Run beginning") defer close(ch) // Send an initial full view. @@ -106,6 +143,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { all = append(all, kd.updateAPIServersTargetGroup()) all = append(all, kd.updateNodesTargetGroup()) + all = append(all, kd.updatePodsTargetGroup()) + for _, ns := range kd.pods { + for _, pod := range ns { + all = append(all, kd.updatePodTargetGroup(pod)) + } + } select { case ch <- all: @@ -119,21 +162,32 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { go kd.watchNodes(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval) + go kd.watchPods(update, ctx.Done(), retryInterval) - var tg *config.TargetGroup for { + tg := []*config.TargetGroup{} select { case <-ctx.Done(): return case event := <-update: switch obj := event.(type) { case *nodeEvent: + log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name) kd.updateNode(obj.Node, obj.EventType) - tg = kd.updateNodesTargetGroup() + tg = append(tg, kd.updateNodesTargetGroup()) case *serviceEvent: - tg = kd.updateService(obj.Service, obj.EventType) + log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) + tg = append(tg, kd.updateService(obj.Service, obj.EventType)) case *endpointsEvent: - tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType) + log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) + tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) + case *podEvent: + log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name) + // Update the per-pod target group + kd.updatePod(obj.Pod, obj.EventType) + tg = append(tg, kd.updatePodTargetGroup(obj.Pod)) + // ...and update the all pods target group + tg = append(tg, kd.updatePodsTargetGroup()) } } @@ -141,10 +195,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { continue } - select { - case ch <- []*config.TargetGroup{tg}: - case <-ctx.Done(): - return + for _, t := range tg { + select { + case ch <- []*config.TargetGroup{t}: + case <-ctx.Done(): + return + } } } } @@ -173,7 +229,7 @@ func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error lastErr = err kd.rotateAPIServers() } - return nil, fmt.Errorf("Unable to query any API servers: %v", lastErr) + return nil, fmt.Errorf("unable to query any API servers: %v", lastErr) } func (kd *Discovery) rotateAPIServers() { @@ -267,17 +323,17 @@ func (kd *Discovery) getNodes() (map[string]*Node, string, error) { if err != nil { // If we can't list nodes then we can't watch them. Assume this is a misconfiguration // & return error. - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes: %s", err) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) } var nodes NodeList if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body)) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) } nodeMap := map[string]*Node{} @@ -293,16 +349,16 @@ func (kd *Discovery) getServices() (map[string]map[string]*Service, string, erro if err != nil { // If we can't list services then we can't watch them. Assume this is a misconfiguration // & return error. - return nil, "", fmt.Errorf("Unable to list Kubernetes services: %s", err) + return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) } var services ServiceList if err := json.NewDecoder(res.Body).Decode(&services); err != nil { body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body)) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) } serviceMap := map[string]map[string]*Service{} @@ -735,3 +791,243 @@ func nodeHostIP(node *Node) (net.IP, error) { } return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) } + +//////////////////////////////////// +// Here there be dragons. // +// Pod discovery code lies below. // +//////////////////////////////////// + +func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { + kd.podsMu.Lock() + defer kd.podsMu.Unlock() + + switch eventType { + case deleted: + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok { + delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) + if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 { + delete(kd.pods, pod.ObjectMeta.Namespace) + } + } + case added, modified: + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { + kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod + } +} + +func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) { + res, err := kd.queryAPIServerPath(podsURL) + if err != nil { + return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var pods PodList + if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + } + + podMap := map[string]map[string]*Pod{} + for idx, pod := range pods.Items { + if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { + podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) + podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + } + + return podMap, pods.ResourceVersion, nil +} + +func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + pods, resourceVersion, err := kd.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + kd.podsMu.Lock() + kd.pods = pods + kd.podsMu.Unlock() + + req, err := http.NewRequest("GET", podsURL, nil) + if err != nil { + log.Errorf("Cannot create pods request: %s", err) + return + } + + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch pods: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch pods: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event podEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch pods unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func podSource(pod *Pod) string { + return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name +} + +type ByContainerPort []ContainerPort + +func (a ByContainerPort) Len() int { return len(a) } +func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } +func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +type ByContainerName []Container + +func (a ByContainerName) Len() int { return len(a) } +func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { + var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) + if pod.PodStatus.PodIP == "" { + log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) + return targets + } + + if pod.PodStatus.Phase != "Running" { + log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) + return targets + } + + ready := "unknown" + for _, cond := range pod.PodStatus.Conditions { + if strings.ToLower(cond.Type) == "ready" { + ready = strings.ToLower(cond.Status) + } + } + + sort.Sort(ByContainerName(pod.PodSpec.Containers)) + + for _, container := range pod.PodSpec.Containers { + // Collect a list of TCP ports + // Sort by port number, ascending + // Product a target pointed at the first port + // Include a label containing all ports (portName=port,PortName=port,...,) + var tcpPorts []ContainerPort + var portLabel *bytes.Buffer = bytes.NewBufferString(",") + + for _, port := range container.Ports { + if port.Protocol == "TCP" { + tcpPorts = append(tcpPorts, port) + } + } + + if len(tcpPorts) == 0 { + log.Debugf("skipping container %s with no TCP ports", container.Name) + continue + } + + sort.Sort(ByContainerPort(tcpPorts)) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), + podNameLabel: model.LabelValue(pod.ObjectMeta.Name), + podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), + podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), + podContainerNameLabel: model.LabelValue(container.Name), + podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), + podReadyLabel: model.LabelValue(ready), + } + + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + } + + t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) + + for k, v := range pod.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(podLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range pod.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + targets = append(targets, t) + + if !allContainers { + break + } + } + return targets +} + +func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { + kd.podsMu.RLock() + defer kd.podsMu.RUnlock() + + tg := &config.TargetGroup{ + Source: podSource(pod), + } + + // If this pod doesn't exist, return an empty target group + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { + return tg + } + if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { + return tg + } + + tg.Labels = model.LabelSet{ + roleLabel: model.LabelValue("container"), + } + tg.Targets = updatePodTargets(pod, true) + + return tg +} + +func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podsTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("pod"), + }, + } + + for _, namespace := range kd.pods { + for _, pod := range namespace { + tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) + } + } + + return tg +} diff --git a/retrieval/discovery/kubernetes/discovery_test.go b/retrieval/discovery/kubernetes/discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..78144791c3d907d6abef2730b8fdf518a6ebc633 --- /dev/null +++ b/retrieval/discovery/kubernetes/discovery_test.go @@ -0,0 +1,207 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "flag" + "math/rand" + "os" + "testing" + + _ "github.com/prometheus/common/log" + "github.com/prometheus/common/model" +) + +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} + +var portsA = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, +} + +var portsB = []ContainerPort{ + ContainerPort{ + Name: "https", + ContainerPort: 443, + Protocol: "TCP", + }, +} + +var portsNoTcp = []ContainerPort{ + ContainerPort{ + Name: "dns", + ContainerPort: 53, + Protocol: "UDP", + }, +} + +var portsMultiA = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + ContainerPort{ + Name: "ssh", + ContainerPort: 22, + Protocol: "TCP", + }, +} + +var portsMultiB = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + ContainerPort{ + Name: "https", + ContainerPort: 443, + Protocol: "TCP", + }, +} + +func container(name string, ports []ContainerPort) Container { + p := make([]ContainerPort, len(ports)) + copy(p, ports) + + // Shuffle order of ports to ensure code enforces determinism + for i := range p { + j := rand.Intn(i + 1) + p[i], p[j] = p[j], p[i] + } + + return Container{ + Name: name, + Ports: p, + } +} + +func pod(name string, containers []Container) *Pod { + c := make([]Container, len(containers)) + copy(c, containers) + + // Shuffle order of containers to ensure code enforces determinism + for i := range c { + j := rand.Intn(i + 1) + c[i], c[j] = c[j], c[i] + } + + return &Pod{ + ObjectMeta: ObjectMeta{ + Name: name, + }, + PodStatus: PodStatus{ + PodIP: "1.1.1.1", + Phase: "Running", + Conditions: []PodCondition{ + PodCondition{ + Type: "Ready", + Status: "True", + }, + }, + }, + PodSpec: PodSpec{ + Containers: c, + }, + } +} + +func TestUpdatePodTargets(t *testing.T) { + var result []model.LabelSet + + // Multiple iterations help ensure that we'll see different permutations via the various randomizations that occur + for i := 0; i < 100; i++ { + // Return no targets for a pod that isn't "Running" + result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // Return no targets for a pod with no IP + result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // A pod with no containers (?!) should not produce any targets + result = updatePodTargets(pod("empty", []Container{}), true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // A pod with all valid containers should return one target per container with allContainers=true + result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][podReadyLabel] != "true" { + t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) + } + if _, ok := result[0][podContainerPortMapPrefix+"http"]; !ok { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing") + } + if result[0][podContainerPortMapPrefix+"http"] != "80" { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix+"http"]) + } + if _, ok := result[1][podContainerPortMapPrefix+"https"]; !ok { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing") + } + if result[1][podContainerPortMapPrefix+"https"] != "443" { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix+"https"]) + } + + // A pod with all valid containers should return one target with allContainers=false, and it should be the alphabetically first container + result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), false) + if len(result) != 1 { + t.Fatalf("expected 1 targets, received %d", len(result)) + } + if _, ok := result[0][podContainerNameLabel]; !ok { + t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was missing") + } + if result[0][podContainerNameLabel] != "a" { + t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was '%s'", result[0][podContainerNameLabel]) + } + + // A pod with some non-targetable containers should return one target per targetable container with allContainers=true + result = updatePodTargets(pod("mixed", []Container{container("a", portsA), container("no-tcp", portsNoTcp), container("b", portsB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + + // A pod with a container with multiple ports should return the numerically smallest port + result = updatePodTargets(pod("hard", []Container{container("multiA", portsMultiA), container("multiB", portsMultiB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][model.AddressLabel] != "1.1.1.1:22" { + t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) + } + if result[0][podContainerPortListLabel] != ",ssh=22,http=80," { + t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) + } + if result[1][model.AddressLabel] != "1.1.1.1:80" { + t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) + } + if result[1][podContainerPortListLabel] != ",http=80,https=443," { + t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) + } + } +} diff --git a/retrieval/discovery/kubernetes/types.go b/retrieval/discovery/kubernetes/types.go index 28f5c31bfa85e460e812bf097c3c4d303c6e2e60..ba4479c7da1bef615a430f9ee08828edde22707a 100644 --- a/retrieval/discovery/kubernetes/types.go +++ b/retrieval/discovery/kubernetes/types.go @@ -102,6 +102,14 @@ type Container struct { Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"` // Optional. Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"` + + Ports []ContainerPort `json:"ports"` +} + +type ContainerPort struct { + Name string `json:"name"` + ContainerPort int32 `json:"containerPort"` + Protocol string `json:"protocol"` } // Service is a named abstraction of software service (for example, mysql) consisting of local port @@ -235,3 +243,35 @@ type NodeList struct { Items []Node `json:"items" description:"list of nodes"` } + +type Pod struct { + ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + PodStatus `json:"status,omitempty" description:"pod status object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podstatus"` + PodSpec `json:"spec,omitempty" description:"pod spec object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podspec"` +} + +type podEvent struct { + EventType EventType `json:"type"` + Pod *Pod `json:"object"` +} + +type PodList struct { + ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + Items []Pod `json:"items" description:"list of pods"` +} + +type PodStatus struct { + Phase string `json:"phase" description:"Current condition of the pod. More info: http://kubernetes.io/v1.1/docs/user-guide/pod-states.html#pod-phase"` + PodIP string `json:"podIP" description:"IP address allocated to the pod. Routable at least within the cluster. Empty if not yet allocated."` + Conditions []PodCondition `json:"conditions" description:"Current service state of pod."` +} + +type PodSpec struct { + Containers []Container `json:"containers" description:"list of containers, see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_container"` +} + +type PodCondition struct { + Type string `json:"type" description:"Type is the type of the condition. Currently only Ready."` + Status string `json:"status" description:"Status is the status of the condition. Can be True, False, Unknown."` +}