allocator.go 4.7 KB
Newer Older
L
liqingping 已提交
1 2 3 4 5 6
package allocator

import (
	"context"
	"time"

7 8
	corev1 "k8s.io/api/core/v1"
	apiequality "k8s.io/apimachinery/pkg/api/equality"
L
liqingping 已提交
9 10 11 12 13 14 15
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/source"

	ditypes "opendilab.org/di-orchestrator/pkg/allocator/types"
16
	div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
L
liqingping 已提交
17 18
	dihandler "opendilab.org/di-orchestrator/pkg/common/handler"
	dicontext "opendilab.org/di-orchestrator/pkg/context"
19
	diutil "opendilab.org/di-orchestrator/pkg/utils"
L
liqingping 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
)

type Allocator struct {
	Scheme           *runtime.Scheme
	ctx              dicontext.Context
	policy           ditypes.FitPolicy
	scheduleDuration time.Duration
	last             time.Time
}

func NewAllocator(scheme *runtime.Scheme, ctx dicontext.Context, policy ditypes.FitPolicy, scheduleDuration time.Duration) *Allocator {
	return &Allocator{
		Scheme:           scheme,
		ctx:              ctx,
		policy:           policy,
		scheduleDuration: scheduleDuration,
		last:             time.Now(),
	}
}

func (a *Allocator) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
41
	log := a.ctx.Log.WithName("Reconcile").WithValues("job", req.NamespacedName)
L
liqingping 已提交
42
	if !a.needReconcile() {
43
		log.V(2).Info("skipped reconcile since scheduling duration not meet")
L
liqingping 已提交
44 45 46 47 48
		return ctrl.Result{}, nil
	}
	a.updateLastTime()

	jobkey := req.NamespacedName
49
	job := &div2alpha1.DIJob{}
L
liqingping 已提交
50 51 52 53
	if err := a.ctx.Get(ctx, jobkey, job); err != nil {
		return ctrl.Result{}, err
	}

54 55 56 57 58
	jobinfo, err := getJobInfo(job)
	if err != nil {
		log.Error(err, "get jobinfo failed")
		return ctrl.Result{}, err
	}
59 60 61 62 63 64 65 66 67 68 69 70
	nodes, err := a.ctx.ListNodes()
	if err != nil {
		log.Error(err, "list nodes failed")
		return ctrl.Result{}, err
	}

	nodeinfos, err := a.getNodeInfos(nodes)
	if err != nil {
		log.Error(err, "list nodeinfos failed")
		return ctrl.Result{}, err
	}
	log.V(2).Info("get", "nodeinfos", nodeinfos)
L
liqingping 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83
	jobinfos := map[string]ditypes.JobInfo{
		jobinfo.Key.String(): jobinfo,
	}
	prevAllocations := map[string]ditypes.NodeList{}
	if err := a.allocateAll(jobinfos, nodeinfos, prevAllocations); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (a *Allocator) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
84
		For(&div2alpha1.DIJob{}).
L
liqingping 已提交
85
		Watches(
86
			&source.Kind{Type: &div2alpha1.DIJob{}},
L
liqingping 已提交
87 88
			&dihandler.EventHandler{
				OnCreateHandlers: []func(obj client.Object){
89
					a.onJobAddHandler,
L
liqingping 已提交
90 91 92 93
				},
			},
			builder.Predicates{},
		).
94 95 96 97 98 99 100 101
		Watches(
			&source.Kind{Type: &corev1.Node{}},
			&dihandler.EventHandler{},
		).
		Watches(
			&source.Kind{Type: &corev1.Pod{}},
			&dihandler.EventHandler{},
		).
L
liqingping 已提交
102 103 104
		Complete(a)
}

105 106
// onJobAddHandler handle the event when a job is created.
func (a *Allocator) onJobAddHandler(obj client.Object) {
107
	log := a.ctx.Log.WithName("onJobAddHandler").WithValues("job", diutil.NamespacedName(obj.GetNamespace(), obj.GetName()))
108
	job := obj.(*div2alpha1.DIJob)
L
liqingping 已提交
109

110
	if err := a.allocate(job); err != nil {
111
		log.Error(err, "failed to allocate")
112 113 114
	}
}

115 116 117
// return true if time elapsed is almost greater than schedule duration.
func (a *Allocator) needReconcile() bool {
	return (a.scheduleDuration - time.Since(a.last)) < time.Second
L
liqingping 已提交
118 119
}

120 121
func (a *Allocator) updateLastTime() {
	a.last = time.Now()
122 123
}

124
func (a *Allocator) allocate(job *div2alpha1.DIJob) error {
125
	log := a.ctx.Log.WithName("allocate").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
126 127 128
	status := job.Status.DeepCopy()
	// allocate job if preemptible, otherwise just update status.replicas
	if job.Spec.Preemptible {
129 130 131 132 133
		jobinfo, err := getJobInfo(job)
		if err != nil {
			log.Error(err, "get jobinfo failed")
			return err
		}
134 135 136 137 138 139 140 141
		nodes, err := a.ctx.ListNodes()
		if err != nil {
			return err
		}
		nodeinfos, err := a.getNodeInfos(nodes)
		if err != nil {
			return err
		}
142 143 144 145
		allocation, err := a.policy.Allocate(jobinfo, nodeinfos)
		if err != nil {
			return err
		}
146
		log.Info("successfully allocate", "allocation", allocation)
147 148 149 150 151 152 153 154 155 156 157
		if len(allocation) != 0 {
			job.Status.Allocation = allocation
		}
	} else {
		job.Status.Replicas = job.Spec.MinReplicas
	}

	if !apiequality.Semantic.DeepEqual(job.Status, *status) {
		if err := a.ctx.UpdateDIJobStatusInCluster(job); err != nil {
			return err
		}
L
liqingping 已提交
158 159 160 161
	}
	return nil
}

162
func (a *Allocator) allocateAll(jobinfos map[string]ditypes.JobInfo, nodeinfos map[string]*ditypes.NodeInfo, prevAllocations map[string]ditypes.NodeList) error {
L
liqingping 已提交
163 164 165 166 167
	log := a.ctx.Log.WithName("allocateAll")
	allocations, err := a.policy.Optimize(jobinfos, nodeinfos, prevAllocations)
	if err != nil {
		return err
	}
168
	log.Info("successfully allocate all", "allocations", allocations)
L
liqingping 已提交
169 170
	return nil
}