util.go 9.2 KB
Newer Older
L
liqingping 已提交
1 2 3 4 5 6 7 8 9 10
package util

import (
	"context"
	"fmt"
	"math/rand"
	"strings"
	"time"

	corev1 "k8s.io/api/core/v1"
11
	"k8s.io/apimachinery/pkg/api/resource"
L
liqingping 已提交
12 13 14 15 16 17 18
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	utilrand "k8s.io/apimachinery/pkg/util/rand"
	"sigs.k8s.io/controller-runtime/pkg/client"

19
	div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
L
liqingping 已提交
20 21
	dicommon "opendilab.org/di-orchestrator/pkg/common"
	commontypes "opendilab.org/di-orchestrator/pkg/common/types"
L
liqingping 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35
)

const (
	randomLength = 5
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func GenerateName(name string) string {
	return fmt.Sprintf("%s-%s", name, utilrand.String(randomLength))
}

L
liqingping 已提交
36 37 38 39 40 41 42 43 44 45 46 47
func NamespacedName(namespace, name string) string {
	return fmt.Sprintf("%s/%s", namespace, name)
}

func SplitNamespaceName(namespaceName string) (types.NamespacedName, error) {
	strs := strings.Split(namespaceName, "/")
	if len(strs) != 2 {
		return types.NamespacedName{}, fmt.Errorf("Invalid namespace, name %s", namespaceName)
	}
	return types.NamespacedName{Namespace: strs[0], Name: strs[1]}, nil
}

L
liqingping 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60
func GetObjectFromUnstructured(obj interface{}, dest interface{}) error {
	us, ok := obj.(*unstructured.Unstructured)
	if !ok {
		return fmt.Errorf("the object %s is not unstructured", obj)
	}
	err := runtime.DefaultUnstructuredConverter.FromUnstructured(us.UnstructuredContent(), dest)
	if err != nil {
		return err
	}

	return nil
}

L
liqingping 已提交
61
func GetDefaultPortFromPod(pod *corev1.Pod) (int32, bool) {
L
liqingping 已提交
62
	for _, c := range pod.Spec.Containers {
L
liqingping 已提交
63
		if c.Name != dicommon.DefaultContainerName {
L
liqingping 已提交
64 65 66
			continue
		}
		for _, port := range c.Ports {
L
liqingping 已提交
67
			if port.Name == dicommon.DefaultPortName {
L
liqingping 已提交
68 69 70 71 72 73 74
				return port.ContainerPort, true
			}
		}
	}
	return -1, false
}

75
func AddPortToPod(pod *corev1.Pod, port corev1.ContainerPort) {
L
liqingping 已提交
76
	for i := range pod.Spec.Containers {
L
liqingping 已提交
77
		if pod.Spec.Containers[i].Name != dicommon.DefaultContainerName {
L
liqingping 已提交
78 79 80 81 82
			continue
		}
		if pod.Spec.Containers[i].Ports == nil {
			pod.Spec.Containers[i].Ports = []corev1.ContainerPort{}
		}
83
		pod.Spec.Containers[i].Ports = append(pod.Spec.Containers[i].Ports, port)
L
liqingping 已提交
84 85 86 87
	}
}

func GenLabels(jobName string) map[string]string {
88
	groupName := div1alpha2.GroupVersion.Group
L
liqingping 已提交
89
	return map[string]string{
90 91 92
		dicommon.LabelGroup:    groupName,
		dicommon.LabelJob:      strings.Replace(jobName, "/", "-", -1),
		dicommon.LabelOperator: dicommon.OperatorName,
L
liqingping 已提交
93 94 95
	}
}

96 97 98 99
func GenPodName(jobName string, group, rank int) string {
	return fmt.Sprintf("%s-%d-%d", jobName, group, rank)
}

L
liqingping 已提交
100 101 102 103 104 105 106 107 108
func AddLabelsToPod(pod *corev1.Pod, labels map[string]string) {
	if pod.ObjectMeta.Labels == nil {
		pod.ObjectMeta.Labels = make(map[string]string)
	}
	for k, v := range labels {
		pod.ObjectMeta.Labels[k] = v
	}
}

109 110 111 112 113 114 115 116 117
func AddAnnotationsToPod(pod *corev1.Pod, annotations map[string]string) {
	if pod.ObjectMeta.Annotations == nil {
		pod.ObjectMeta.Annotations = make(map[string]string)
	}
	for k, v := range annotations {
		pod.ObjectMeta.Annotations[k] = v
	}
}

L
liqingping 已提交
118 119 120 121 122 123
func AddEnvsToPod(pod *corev1.Pod, envs map[string]string) {
	for i := range pod.Spec.Containers {
		if len(pod.Spec.Containers[i].Env) == 0 {
			pod.Spec.Containers[i].Env = make([]corev1.EnvVar, 0)
		}
		for k, v := range envs {
124
			env := corev1.EnvVar{Name: k, Value: v}
L
liqingping 已提交
125 126 127 128 129
			pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, env)
		}
	}
}

130 131 132 133 134 135 136 137 138 139 140 141 142 143
func GetEnvFromPod(pod *corev1.Pod, envName string) (string, bool) {
	for _, container := range pod.Spec.Containers {
		if container.Name != dicommon.DefaultContainerName {
			continue
		}
		for _, env := range container.Env {
			if env.Name == envName {
				return env.Value, true
			}
		}
	}
	return "", false
}

144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func CountPodsScheduled(pods []*corev1.Pod) int {
	count := 0
	for _, pod := range pods {
		for _, c := range pod.Status.Conditions {
			if c.Type == corev1.PodScheduled && c.Status == corev1.ConditionTrue {
				count++
			}
		}
	}
	return count
}

func CountReadyPods(pods []*corev1.Pod) int {
	count := 0
	for _, pod := range pods {
		for _, c := range pod.Status.ContainerStatuses {
			if c.Ready {
				count++
			}
		}
	}
	return count
}

168
func BuildService(name, namespace string, ownRefer metav1.OwnerReference, labels map[string]string, port int32) *corev1.Service {
L
liqingping 已提交
169 170
	svc := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
171 172 173 174
			Name:            name,
			Namespace:       namespace,
			Labels:          labels,
			OwnerReferences: []metav1.OwnerReference{ownRefer},
L
liqingping 已提交
175 176 177 178 179 180 181 182
		},
		Spec: corev1.ServiceSpec{
			Type:      corev1.ServiceTypeClusterIP,
			ClusterIP: "None",
			Selector:  labels,
			Ports: []corev1.ServicePort{
				{
					Port: port,
L
liqingping 已提交
183
					Name: dicommon.DefaultPortName,
L
liqingping 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196
				},
			},
		},
	}

	return svc
}

func ConcatURL(name, ns string, port int32) string {
	return fmt.Sprintf("%s.%s:%d", name, ns, port)
}

func GetPodAccessURL(pod *corev1.Pod, defaultPort int32) string {
L
liqingping 已提交
197
	port, found := GetDefaultPortFromPod(pod)
L
liqingping 已提交
198 199 200 201 202 203 204 205 206
	if !found {
		port = defaultPort
	}
	return ConcatURL(pod.Name, pod.Namespace, port)
}

func GetServiceAccessURL(service *corev1.Service) string {
	url := ""
	for _, port := range service.Spec.Ports {
L
liqingping 已提交
207
		if port.Name == dicommon.DefaultPortName {
L
liqingping 已提交
208 209 210 211 212 213 214
			url = ConcatURL(service.Name, service.Namespace, port.Port)
			break
		}
	}
	return url
}

215
func ListPods(ctx context.Context, cli client.Client, job *div1alpha2.DIJob) ([]*corev1.Pod, error) {
L
liqingping 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
	podList := &corev1.PodList{}

	// generate label selector
	labelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
		MatchLabels: GenLabels(job.Name),
	})
	if err != nil {
		return nil, err
	}

	// list pods of job
	err = cli.List(ctx, podList, &client.ListOptions{Namespace: job.Namespace, LabelSelector: labelSelector})
	if err != nil {
		return nil, err
	}

	pods := []*corev1.Pod{}
	for _, pod := range podList.Items {
		pods = append(pods, pod.DeepCopy())
	}
	return pods, nil
}

239
func ListServices(ctx context.Context, cli client.Client, job *div1alpha2.DIJob) ([]*corev1.Service, error) {
L
liqingping 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
	svcList := &corev1.ServiceList{}

	// generate label selector
	labelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
		MatchLabels: GenLabels(job.Name),
	})
	if err != nil {
		return nil, err
	}

	// list svcs of job
	err = cli.List(ctx, svcList, &client.ListOptions{Namespace: job.Namespace, LabelSelector: labelSelector})
	if err != nil {
		return nil, err
	}

	svcs := []*corev1.Service{}
	for _, svc := range svcList.Items {
		svcs = append(svcs, svc.DeepCopy())
	}
	return svcs, nil
}

263 264
func FilterOutTerminatingPods(pods []*corev1.Pod) []*corev1.Pod {
	results := []*corev1.Pod{}
L
liqingping 已提交
265
	for _, pod := range pods {
266
		if IsTerminating(pod) {
L
liqingping 已提交
267 268
			continue
		}
269
		results = append(results, pod)
L
liqingping 已提交
270 271
	}

272 273
	return results
}
L
liqingping 已提交
274

275 276 277 278
// IsTerminating returns true if pod's DeletionTimestamp has been set
func IsTerminating(pod *corev1.Pod) bool {
	return pod.DeletionTimestamp != nil
}
L
liqingping 已提交
279

280 281 282
func SetPodResources(pod *corev1.Pod, resources commontypes.ResourceQuantity) {
	for i := range pod.Spec.Containers {
		if pod.Spec.Containers[i].Name != dicommon.DefaultContainerName {
L
liqingping 已提交
283 284
			continue
		}
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
		if pod.Spec.Containers[i].Resources.Limits == nil {
			pod.Spec.Containers[i].Resources.Limits = make(corev1.ResourceList)
		}
		if pod.Spec.Containers[i].Resources.Requests == nil {
			pod.Spec.Containers[i].Resources.Requests = make(corev1.ResourceList)
		}

		// cpu and memory must not be zero
		if !resources.CPU.IsZero() {
			pod.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.CPU
			pod.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.CPU
		}
		if !resources.Memory.IsZero() {
			pod.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory
			pod.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory
		}
		if !resources.GPU.IsZero() {
			pod.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
			pod.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
		}
L
liqingping 已提交
305 306
	}
}
L
liqingping 已提交
307

308 309 310 311 312 313 314 315
func GetPodResources(pod *corev1.Pod) commontypes.ResourceQuantity {
	resource := commontypes.ResourceQuantity{
		CPU:    resource.MustParse("0"),
		GPU:    resource.MustParse("0"),
		Memory: resource.MustParse("0"),
	}
	for _, container := range pod.Spec.Containers {
		if container.Name != dicommon.DefaultContainerName {
L
liqingping 已提交
316 317
			continue
		}
318 319 320 321 322 323 324 325 326 327 328 329 330
		if container.Resources.Limits == nil && container.Resources.Requests == nil {
			break
		}
		if container.Resources.Requests != nil {
			resource.CPU = container.Resources.Requests[corev1.ResourceCPU].DeepCopy()
			resource.GPU = container.Resources.Requests[corev1.ResourceName("nvidia.com/gpu")].DeepCopy()
			resource.Memory = container.Resources.Requests[corev1.ResourceMemory].DeepCopy()
		}
		if container.Resources.Limits != nil {
			resource.CPU = container.Resources.Limits[corev1.ResourceCPU].DeepCopy()
			resource.GPU = container.Resources.Limits[corev1.ResourceName("nvidia.com/gpu")].DeepCopy()
			resource.Memory = container.Resources.Limits[corev1.ResourceMemory].DeepCopy()
		}
L
liqingping 已提交
331
	}
332
	return resource
L
liqingping 已提交
333
}
334 335 336 337 338 339 340 341 342 343

func NewOwnerReference(apiVersion, kind, name string, uid types.UID, controller bool) metav1.OwnerReference {
	return metav1.OwnerReference{
		APIVersion: apiVersion,
		Kind:       kind,
		Name:       name,
		UID:        uid,
		Controller: &controller,
	}
}