replicationcontrollerdetail.go 14.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
18
	"bytes"
M
Marcin Maciaszczyk 已提交
19 20
	"log"

21
	"k8s.io/kubernetes/pkg/api"
22
	unversioned "k8s.io/kubernetes/pkg/api/unversioned"
23
	client "k8s.io/kubernetes/pkg/client/unversioned"
24 25
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/labels"
26 27
)

28 29 30
// ReplicationControllerDetail represents detailed information about a Replication Controller.
type ReplicationControllerDetail struct {
	// Name of the Replication Controller.
31 32
	Name string `json:"name"`

33
	// Namespace the Replication Controller is in.
34 35
	Namespace string `json:"namespace"`

36
	// Label mapping of the Replication Controller.
37 38
	Labels map[string]string `json:"labels"`

39
	// Label selector of the Replication Controller.
40 41
	LabelSelector map[string]string `json:"labelSelector"`

42
	// Container image list of the pod template specified by this Replication Controller.
43 44
	ContainerImages []string `json:"containerImages"`

45 46
	// Aggregate information about pods of this replication controller.
	PodInfo ReplicationControllerPodInfo `json:"podInfo"`
47

48 49
	// Detailed information about Pods belonging to this Replication Controller.
	Pods []ReplicationControllerPod `json:"pods"`
50

51
	// Detailed information about service related to Replication Controller.
52
	Services []ServiceDetail `json:"services"`
53 54 55

	// True when the data contains at least one pod with metrics information, false otherwise.
	HasMetrics bool `json:"hasMetrics"`
56 57
}

58
// ReplicationControllerPod is a representation of a Pod that belongs to a Replication Controller.
59
type ReplicationControllerPod struct {
60 61 62
	// Name of the Pod.
	Name string `json:"name"`

63 64 65
	// Status of the Pod. See Kubernetes API for reference.
	PodPhase api.PodPhase `json:"podPhase"`

66 67 68 69 70 71 72 73
	// Time the Pod has started. Empty if not started.
	StartTime *unversioned.Time `json:"startTime"`

	// IP address of the Pod.
	PodIP string `json:"podIP"`

	// Name of the Node this Pod runs on.
	NodeName string `json:"nodeName"`
74 75 76

	// Count of containers restarts.
	RestartCount int `json:"restartCount"`
77 78

	// Pod metrics.
79
	Metrics *PodMetrics `json:"metrics"`
80 81
}

82
// ServiceDetail is a representation of a Service connected to Replication Controller.
83
type ServiceDetail struct {
84 85 86
	// Name of the service.
	Name string `json:"name"`

87
	// Internal endpoints of all Kubernetes services that have the same label selector as connected
88
	// Replication Controller.
89
	// Endpoint is DNS name merged with ports.
90
	InternalEndpoint Endpoint `json:"internalEndpoint"`
91 92

	// External endpoints of all Kubernetes services that have the same label selector as connected
93
	// Replication Controller.
94
	// Endpoint is external IP address name merged with ports.
95
	ExternalEndpoints []Endpoint `json:"externalEndpoints"`
96 97 98 99 100

	// Label selector of the service.
	Selector map[string]string `json:"selector"`
}

101
// ServicePort is a pair of port and protocol, e.g. a service endpoint.
102 103 104 105 106 107 108 109
type ServicePort struct {
	// Positive port number.
	Port int `json:"port"`

	// Protocol name, e.g., TCP or UDP.
	Protocol api.Protocol `json:"protocol"`
}

110
// Endpoint describes an endpoint that is host and a list of available ports for that host.
111 112 113 114 115 116 117 118
type Endpoint struct {
	// Hostname, either as a domain name or IP address.
	Host string `json:"host"`

	// List of ports opened for this endpoint on the hostname.
	Ports []ServicePort `json:"ports"`
}

119 120
// Information needed to update replication controller
type ReplicationControllerSpec struct {
121 122 123 124
	// Replicas (pods) number in replicas set
	Replicas int `json:"replicas"`
}

125 126 127 128
// Returns detailed information about the given replication controller in the given namespace.
func GetReplicationControllerDetail(client client.Interface, heapsterClient HeapsterClient,
	namespace, name string) (*ReplicationControllerDetail, error) {
	log.Printf("Getting details of %s replication controller in %s namespace", name, namespace)
129

130
	replicationControllerWithPods, err := getRawReplicationControllerWithPods(client, namespace, name)
131 132 133
	if err != nil {
		return nil, err
	}
134 135
	replicationController := replicationControllerWithPods.ReplicationController
	pods := replicationControllerWithPods.Pods
136

137
	replicationControllerMetricsByPod, err := getReplicationControllerPodsMetrics(pods, heapsterClient, namespace, name)
138
	if err != nil {
139
		log.Printf("Skipping Heapster metrics because of error: %s\n", err)
140 141
	}

B
bryk 已提交
142 143 144
	services, err := client.Services(namespace).List(api.ListOptions{
		LabelSelector: labels.Everything(),
		FieldSelector: fields.Everything(),
145 146 147 148 149 150
	})

	if err != nil {
		return nil, err
	}

151 152 153 154 155 156
	replicationControllerDetail := &ReplicationControllerDetail{
		Name:          replicationController.Name,
		Namespace:     replicationController.Namespace,
		Labels:        replicationController.ObjectMeta.Labels,
		LabelSelector: replicationController.Spec.Selector,
		PodInfo:       getReplicationControllerPodInfo(replicationController, pods.Items),
157 158
	}

159
	matchingServices := getMatchingServices(services.Items, replicationController)
160

M
Marcin Maciaszczyk 已提交
161 162 163 164 165
	// Anonymous callback function to get nodes by their names.
	getNodeFn := func(nodeName string) (*api.Node, error) {
		return client.Nodes().Get(nodeName)
	}

166
	for _, service := range matchingServices {
M
Marcin Maciaszczyk 已提交
167 168
		replicationControllerDetail.Services = append(replicationControllerDetail.Services,
			getServiceDetail(service, *replicationController, pods.Items, getNodeFn))
169 170
	}

171
	for _, container := range replicationController.Spec.Template.Spec.Containers {
M
Marcin Maciaszczyk 已提交
172 173
		replicationControllerDetail.ContainerImages = append(replicationControllerDetail.ContainerImages,
			container.Image)
174 175 176
	}

	for _, pod := range pods.Items {
177
		podDetail := ReplicationControllerPod{
178
			Name:         pod.Name,
179
			PodPhase:     pod.Status.Phase,
180 181 182 183
			StartTime:    pod.Status.StartTime,
			PodIP:        pod.Status.PodIP,
			NodeName:     pod.Spec.NodeName,
			RestartCount: getRestartCount(pod),
184
		}
185 186
		if replicationControllerMetricsByPod != nil {
			metric := replicationControllerMetricsByPod.MetricsMap[pod.Name]
187
			podDetail.Metrics = &metric
188
			replicationControllerDetail.HasMetrics = true
189
		}
190
		replicationControllerDetail.Pods = append(replicationControllerDetail.Pods, podDetail)
191 192
	}

193
	return replicationControllerDetail, nil
194
}
195

196 197 198 199 200 201
// TODO(floreks): This should be transactional to make sure that RC will not be deleted without pods
// Deletes replication controller with given name in given namespace and related pods.
// Also deletes services related to replication controller if deleteServices is true.
func DeleteReplicationController(client client.Interface, namespace, name string,
	deleteServices bool) error {

202
	log.Printf("Deleting %s replication controller from %s namespace", name, namespace)
M
Marcin Maciaszczyk 已提交
203

204 205 206 207 208 209
	if deleteServices {
		if err := DeleteReplicationControllerServices(client, namespace, name); err != nil {
			return err
		}
	}

210
	pods, err := getRawReplicationControllerPods(client, namespace, name)
211 212 213 214 215 216 217 218 219 220 221 222 223 224
	if err != nil {
		return err
	}

	if err := client.ReplicationControllers(namespace).Delete(name); err != nil {
		return err
	}

	for _, pod := range pods.Items {
		if err := client.Pods(namespace).Delete(pod.Name, &api.DeleteOptions{}); err != nil {
			return err
		}
	}

225
	log.Printf("Successfully deleted %s replication controller from %s namespace", name, namespace)
M
Marcin Maciaszczyk 已提交
226

227 228 229
	return nil
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
// Deletes services related to replication controller with given name in given namespace.
func DeleteReplicationControllerServices(client client.Interface, namespace, name string) error {
	log.Printf("Deleting services related to %s replication controller from %s namespace", name,
		namespace)

	replicationController, err := client.ReplicationControllers(namespace).Get(name)
	if err != nil {
		return err
	}

	labelSelector, err := toLabelSelector(replicationController.Spec.Selector)
	if err != nil {
		return err
	}

	services, err := getServicesForDeletion(client, labelSelector, namespace)
	if err != nil {
		return err
	}

	for _, service := range services {
		if err := client.Services(namespace).Delete(service.Name); err != nil {
			return err
		}
	}

	log.Printf("Successfully deleted services related to %s replication controller from %s namespace",
		name, namespace)

	return nil
}

262
// Updates number of replicas in Replication Controller based on Replication Controller Spec
M
Marcin Maciaszczyk 已提交
263
func UpdateReplicasCount(client client.Interface, namespace, name string,
264 265 266
	replicationControllerSpec *ReplicationControllerSpec) error {
	log.Printf("Updating replicas count to %d for %s replication controller from %s namespace",
		replicationControllerSpec.Replicas, name, namespace)
M
Marcin Maciaszczyk 已提交
267

268
	replicationController, err := client.ReplicationControllers(namespace).Get(name)
269 270 271 272
	if err != nil {
		return err
	}

273
	replicationController.Spec.Replicas = replicationControllerSpec.Replicas
274

275
	_, err = client.ReplicationControllers(namespace).Update(replicationController)
276 277 278 279
	if err != nil {
		return err
	}

280 281
	log.Printf("Successfully updated replicas count to %d for %s replication controller from %s namespace",
		replicationControllerSpec.Replicas, name, namespace)
M
Marcin Maciaszczyk 已提交
282

283 284 285
	return nil
}

286
// Returns detailed information about service from given service
M
Marcin Maciaszczyk 已提交
287 288 289
func getServiceDetail(service api.Service, replicationController api.ReplicationController,
	pods []api.Pod, getNodeFn GetNodeFunc) ServiceDetail {
	return ServiceDetail{
290
		Name: service.ObjectMeta.Name,
291 292
		InternalEndpoint: getInternalEndpoint(service.Name, service.Namespace,
			service.Spec.Ports),
M
Marcin Maciaszczyk 已提交
293
		ExternalEndpoints: getExternalEndpoints(replicationController, pods, service, getNodeFn),
294 295 296
		Selector:          service.Spec.Selector,
	}
}
297 298 299 300 301 302 303 304 305

// Gets restart count of given pod (total number of its containers restarts).
func getRestartCount(pod api.Pod) int {
	restartCount := 0
	for _, containerStatus := range pod.Status.ContainerStatuses {
		restartCount += containerStatus.RestartCount
	}
	return restartCount
}
306 307 308

// Returns internal endpoint name for the given service properties, e.g.,
// "my-service.namespace 80/TCP" or "my-service 53/TCP,53/UDP".
M
Marcin Maciaszczyk 已提交
309
func getInternalEndpoint(serviceName, namespace string, ports []api.ServicePort) Endpoint {
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324

	name := serviceName
	if namespace != api.NamespaceDefault {
		bufferName := bytes.NewBufferString(name)
		bufferName.WriteString(".")
		bufferName.WriteString(namespace)
		name = bufferName.String()
	}

	return Endpoint{
		Host:  name,
		Ports: getServicePorts(ports),
	}
}

M
Marcin Maciaszczyk 已提交
325 326 327 328 329 330 331 332
// Returns array of external endpoints for a replication controller.
func getExternalEndpoints(replicationController api.ReplicationController, pods []api.Pod,
	service api.Service, getNodeFn GetNodeFunc) []Endpoint {
	var externalEndpoints []Endpoint
	replicationControllerPods := filterReplicationControllerPods(replicationController, pods)

	if service.Spec.Type == api.ServiceTypeNodePort {
		externalEndpoints = getNodePortEndpoints(replicationControllerPods, service, getNodeFn)
333
	} else if service.Spec.Type == api.ServiceTypeLoadBalancer {
M
Marcin Maciaszczyk 已提交
334 335 336 337 338 339 340 341 342 343
		for _, ingress := range service.Status.LoadBalancer.Ingress {
			externalEndpoints = append(externalEndpoints, getExternalEndpoint(ingress,
				service.Spec.Ports))
		}

		if len(externalEndpoints) == 0 {
			externalEndpoints = getNodePortEndpoints(replicationControllerPods, service, getNodeFn)
		}
	}

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
	if len(externalEndpoints) == 0 && (service.Spec.Type == api.ServiceTypeNodePort ||
		service.Spec.Type == api.ServiceTypeLoadBalancer) {
		externalEndpoints = getLocalhostEndpoints(service)
	}

	return externalEndpoints
}

// Returns localhost endpoints for specified node port or load balancer service.
func getLocalhostEndpoints(service api.Service) []Endpoint {
	var externalEndpoints []Endpoint
	for _, port := range service.Spec.Ports {
		externalEndpoints = append(externalEndpoints, Endpoint{
			Host: "localhost",
			Ports: []ServicePort{
				{
					Protocol: port.Protocol,
					Port:     port.NodePort,
				},
			},
		})
	}
M
Marcin Maciaszczyk 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
	return externalEndpoints
}

// Returns pods that belong to specified replication controller.
func filterReplicationControllerPods(replicationController api.ReplicationController,
	allPods []api.Pod) []api.Pod {
	var pods []api.Pod
	for _, pod := range allPods {
		if isLabelSelectorMatching(replicationController.Spec.Selector, pod.Labels) {
			pods = append(pods, pod)
		}
	}
	return pods
}

// Returns array of external endpoints for specified pods.
func getNodePortEndpoints(pods []api.Pod, service api.Service, getNodeFn GetNodeFunc) []Endpoint {
	var externalEndpoints []Endpoint
	var externalIPs []string
	for _, pod := range pods {
		node, err := getNodeFn(pod.Spec.NodeName)
		if err != nil {
			continue
		}
		for _, adress := range node.Status.Addresses {
			if adress.Type == api.NodeExternalIP && len(adress.Address) > 0 &&
				isExternalIPUniqe(externalIPs, adress.Address) {
				externalIPs = append(externalIPs, adress.Address)
				for _, port := range service.Spec.Ports {
					externalEndpoints = append(externalEndpoints, Endpoint{
						Host: adress.Address,
						Ports: []ServicePort{
							{
								Protocol: port.Protocol,
								Port:     port.NodePort,
							},
						},
					})
				}
			}
		}
	}
	return externalEndpoints
}

// Returns true if given external IP is not part of given array.
func isExternalIPUniqe(externalIPs []string, externalIP string) bool {
	for _, h := range externalIPs {
		if h == externalIP {
			return false
		}
	}
	return true
}

421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
// Returns external endpoint name for the given service properties.
func getExternalEndpoint(ingress api.LoadBalancerIngress, ports []api.ServicePort) Endpoint {
	var host string
	if ingress.Hostname != "" {
		host = ingress.Hostname
	} else {
		host = ingress.IP
	}
	return Endpoint{
		Host:  host,
		Ports: getServicePorts(ports),
	}
}

// Gets human readable name for the given service ports list.
func getServicePorts(apiPorts []api.ServicePort) []ServicePort {
	var ports []ServicePort
	for _, port := range apiPorts {
		ports = append(ports, ServicePort{port.Port, port.Protocol})
	}
	return ports
}