提交 6d4fc437 编写于 作者: L liqingping

feat: add allocator

上级 d245b8e9
......@@ -17,6 +17,7 @@ package operator
import (
"context"
"time"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
......@@ -26,26 +27,35 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
cmdcommon "opendilab.org/di-orchestrator/cmd/common"
alloc "opendilab.org/di-orchestrator/pkg/allocator"
alloctypes "opendilab.org/di-orchestrator/pkg/allocator/types"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
dicontext "opendilab.org/di-orchestrator/pkg/context"
"opendilab.org/di-orchestrator/pkg/controllers"
"opendilab.org/di-orchestrator/pkg/handler"
)
type CreateOptions struct {
SyncPeriod *time.Duration
MetricAddress string
ProbeAddress string
EnableLeaderElection bool
}
func NewCreateOptions() *CreateOptions {
DefaultSyncPeriod := 1 * time.Minute
DefaultMetricAddress := ":8443"
DefaultProbeAddress := ":8080"
DefaultEnableLeaderElection := false
return &CreateOptions{
MetricAddress: ":8443",
ProbeAddress: ":8080",
EnableLeaderElection: false,
SyncPeriod: &DefaultSyncPeriod,
MetricAddress: DefaultMetricAddress,
ProbeAddress: DefaultProbeAddress,
EnableLeaderElection: DefaultEnableLeaderElection,
}
}
func (o *CreateOptions) AddFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar(o.SyncPeriod, "sync-period", *o.SyncPeriod, "Resync period for controllers.")
cmd.Flags().StringVar(&o.MetricAddress, "metric-addr", o.MetricAddress, "The address the metric endpoint binds to.")
cmd.Flags().StringVar(&o.ProbeAddress, "probe-addr", o.ProbeAddress, "The address the probe endpoint binds to.")
cmd.Flags().BoolVar(&o.EnableLeaderElection, "leader-elect", o.EnableLeaderElection,
......@@ -91,6 +101,7 @@ func runCommand(cmd *cobra.Command, options *CreateOptions) error {
config := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
SyncPeriod: options.SyncPeriod,
MetricsBindAddress: options.MetricAddress,
HealthProbeBindAddress: options.ProbeAddress,
LeaderElection: options.EnableLeaderElection,
......@@ -101,7 +112,7 @@ func runCommand(cmd *cobra.Command, options *CreateOptions) error {
return err
}
ctx := handler.NewContext(context.Background(),
ctx := dicontext.NewContext(context.Background(),
config,
mgr.GetClient(),
mgr.GetEventRecorderFor("di-operator"),
......@@ -112,6 +123,17 @@ func runCommand(cmd *cobra.Command, options *CreateOptions) error {
return err
}
ctx = dicontext.NewContext(context.Background(),
config,
mgr.GetClient(),
mgr.GetEventRecorderFor("di-allocator"),
ctrl.Log.WithName("di-allocator"))
allocator := alloc.NewAllocator(mgr.GetScheme(), ctx, *alloctypes.NewFitPolicy(), *options.SyncPeriod)
if err = allocator.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create allocator", "allocator", "DIJob")
return err
}
//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
......
......@@ -63,6 +63,26 @@ spec:
- All
- None
type: string
engineFields:
description: EngineFields defines features of the DI-engine framework.
properties:
parallelWorkers:
default: 1
description: ParallelWorkers defines the number of parallel workers
in each worker.
format: int32
minimum: 1
type: integer
topology:
default: star
description: Topology defines the topology among the workers of
the job.
enum:
- star
- alone
- mesh
type: string
type: object
group:
description: Group is a collection of DIJobs.
type: string
......@@ -78,13 +98,6 @@ spec:
format: int32
minimum: 0
type: integer
parallelWorkers:
default: 1
description: ParallelWorkers defines the number of parallel workers
in each worker.
format: int32
minimum: 1
type: integer
preemptible:
default: false
description: Preemptible defines whether the dijob can be preempted.
......@@ -6338,15 +6351,6 @@ spec:
- containers
type: object
type: object
topology:
default: star
description: Topology defines the topology among the workers of the
job.
enum:
- star
- alone
- mesh
type: string
required:
- template
type: object
......
......@@ -74,6 +74,24 @@ spec:
- All
- None
type: string
engineFields:
description: EngineFields defines features of the DI-engine framework.
properties:
parallelWorkers:
default: 1
description: ParallelWorkers defines the number of parallel workers in each worker.
format: int32
minimum: 1
type: integer
topology:
default: star
description: Topology defines the topology among the workers of the job.
enum:
- star
- alone
- mesh
type: string
type: object
group:
description: Group is a collection of DIJobs.
type: string
......@@ -87,12 +105,6 @@ spec:
format: int32
minimum: 0
type: integer
parallelWorkers:
default: 1
description: ParallelWorkers defines the number of parallel workers in each worker.
format: int32
minimum: 1
type: integer
preemptible:
default: false
description: Preemptible defines whether the dijob can be preempted.
......@@ -3844,14 +3856,6 @@ spec:
- containers
type: object
type: object
topology:
default: star
description: Topology defines the topology among the workers of the job.
enum:
- star
- alone
- mesh
type: string
required:
- template
type: object
......
package allocator
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
ditypes "opendilab.org/di-orchestrator/pkg/allocator/types"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
dihandler "opendilab.org/di-orchestrator/pkg/common/handler"
dicontext "opendilab.org/di-orchestrator/pkg/context"
)
type Allocator struct {
Scheme *runtime.Scheme
ctx dicontext.Context
policy ditypes.FitPolicy
scheduleDuration time.Duration
last time.Time
}
func NewAllocator(scheme *runtime.Scheme, ctx dicontext.Context, policy ditypes.FitPolicy, scheduleDuration time.Duration) *Allocator {
return &Allocator{
Scheme: scheme,
ctx: ctx,
policy: policy,
scheduleDuration: scheduleDuration,
last: time.Now(),
}
}
func (a *Allocator) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := a.ctx.Log.WithName("Reconcile")
if !a.needReconcile() {
log.Info("skipped reconcile since scheduling duration not meet")
return ctrl.Result{}, nil
}
a.updateLastTime()
jobkey := req.NamespacedName
job := &div1alpha2.DIJob{}
if err := a.ctx.Get(ctx, jobkey, job); err != nil {
return ctrl.Result{}, err
}
// TODO(liqingping): implement jobinfo getter and nodeinfo getter.
jobinfo := *ditypes.NewJobInfo(types.NamespacedName{
Namespace: job.Namespace, Name: job.Name,
})
nodeinfos := map[string]ditypes.NodeInfo{
"node-0": *ditypes.NewNodeInfo("node-0"),
}
jobinfos := map[string]ditypes.JobInfo{
jobinfo.Key.String(): jobinfo,
}
prevAllocations := map[string]ditypes.NodeList{}
if err := a.allocateAll(jobinfos, nodeinfos, prevAllocations); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (a *Allocator) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&div1alpha2.DIJob{}).
Watches(
&source.Kind{Type: &div1alpha2.DIJob{}},
&dihandler.EventHandler{
OnCreateHandlers: []func(obj client.Object){
a.onJobAdd,
},
OnUpdateHandlers: []func(old, new client.Object){},
},
builder.Predicates{},
).
Complete(a)
}
// return true if time elapsed is almost greater than schedule duration
func (a *Allocator) needReconcile() bool {
return (a.scheduleDuration - time.Since(a.last)) < time.Second
}
func (a *Allocator) updateLastTime() {
a.last = time.Now()
}
func (a *Allocator) onJobAdd(obj client.Object) {
log := a.ctx.Log.WithName("onJobAdd")
job := obj.(*div1alpha2.DIJob)
// TODO(liqingping): implement jobinfo getter and nodeinfo getter.
jobinfo := *ditypes.NewJobInfo(types.NamespacedName{
Namespace: job.Namespace, Name: job.Name,
})
nodeinfos := map[string]ditypes.NodeInfo{
"node-0": *ditypes.NewNodeInfo("node-0"),
}
if err := a.allocate(jobinfo, nodeinfos); err != nil {
log.Error(err, "failed to allocate", "job", jobinfo.Key.String())
}
}
func (a *Allocator) allocate(jobinfo ditypes.JobInfo, nodeinfos map[string]ditypes.NodeInfo) error {
log := a.ctx.Log.WithName("Allocate")
allocation, err := a.policy.Allocate(jobinfo, nodeinfos)
if err != nil {
return err
}
log.Info("new allocation", "allocation", allocation)
return nil
}
func (a *Allocator) allocateAll(jobinfos map[string]ditypes.JobInfo, nodeinfos map[string]ditypes.NodeInfo, prevAllocations map[string]ditypes.NodeList) error {
log := a.ctx.Log.WithName("allocateAll")
allocations, err := a.policy.Optimize(jobinfos, nodeinfos, prevAllocations)
if err != nil {
return err
}
log.Info("new allocations", "allocations", allocations)
return nil
}
package types
// FitPolicy is an implementation of Policy interface.
type FitPolicy struct{}
func NewFitPolicy() *FitPolicy {
return &FitPolicy{}
}
func (p FitPolicy) Allocate(job JobInfo, nodes map[string]NodeInfo) (NodeList, error) {
return NodeList{}, nil
}
func (p FitPolicy) Optimize(jobs map[string]JobInfo, nodes map[string]NodeInfo, prevAllocations map[string]NodeList) (map[string]NodeList, error) {
return map[string]NodeList{}, nil
}
package types
import (
apitypes "k8s.io/apimachinery/pkg/types"
)
type JobInfo struct {
Key apitypes.NamespacedName
}
func NewJobInfo(key apitypes.NamespacedName) *JobInfo {
return &JobInfo{Key: key}
}
type NodeInfo struct {
Key string
}
func NewNodeInfo(key string) *NodeInfo {
return &NodeInfo{Key: key}
}
package types
type NodeList []string
// Policy interface defines two functions to handle single job allocation and global jobs optimization.
type Policy interface {
Allocate(job JobInfo, nodes map[string]NodeInfo) (NodeList, error)
Optimize(jobs map[string]JobInfo, nodes map[string]NodeInfo, prevAllocations map[string]NodeList) (map[string]NodeList, error)
}
......@@ -37,15 +37,8 @@ type DIJobSpec struct {
// +kubebuilder:validation:Enum=normal;high
Priority Priority `json:"priority,omitempty"`
// Topology defines the topology among the workers of the job.
// +kubebuilder:default=star
// +kubebuilder:validation:Enum=star;alone;mesh
Topology Topology `json:"topology,omitempty"`
// ParallelWorkers defines the number of parallel workers in each worker.
// +kubebuilder:default=1
// +kubebuilder:validation:Minimum=1
ParallelWorkers int32 `json:"parallelWorkers,omitempty"`
// EngineFields defines features of the DI-engine framework.
EngineFields EngineFields `json:"engineFields,omitempty"`
// CleanPodPolicy defines the policy to clean pods after DIJob completed.
// +kubebuilder:default=Running
......@@ -69,6 +62,18 @@ type DIJobSpec struct {
Template corev1.PodTemplateSpec `json:"template"`
}
type EngineFields struct {
// Topology defines the topology among the workers of the job.
// +kubebuilder:default=star
// +kubebuilder:validation:Enum=star;alone;mesh
Topology Topology `json:"topology,omitempty"`
// ParallelWorkers defines the number of parallel workers in each worker.
// +kubebuilder:default=1
// +kubebuilder:validation:Minimum=1
ParallelWorkers int32 `json:"parallelWorkers,omitempty"`
}
// Priority defines the priority of DIJob
type Priority string
......
......@@ -21,7 +21,7 @@ limitations under the License.
package v1alpha2
import (
"k8s.io/apimachinery/pkg/runtime"
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
......@@ -103,6 +103,7 @@ func (in *DIJobList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DIJobSpec) DeepCopyInto(out *DIJobSpec) {
*out = *in
out.EngineFields = in.EngineFields
in.Template.DeepCopyInto(&out.Template)
}
......@@ -148,6 +149,21 @@ func (in *DIJobStatus) DeepCopy() *DIJobStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EngineFields) DeepCopyInto(out *EngineFields) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EngineFields.
func (in *EngineFields) DeepCopy() *EngineFields {
if in == nil {
return nil
}
out := new(EngineFields)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Profilings) DeepCopyInto(out *Profilings) {
*out = *in
......
package handler
import (
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
type EventHandler struct {
OnCreateHandlers []func(obj client.Object)
OnUpdateHandlers []func(old client.Object, new client.Object)
OnDeleteHandlers []func(obj client.Object)
}
// Create implements EventHandler
func (e *EventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
for _, handler := range e.OnCreateHandlers {
handler(evt.Object)
}
}
// Update implements EventHandler
func (e *EventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
for _, handler := range e.OnUpdateHandlers {
handler(evt.ObjectOld, evt.ObjectNew)
}
}
// Delete implements EventHandler
func (e *EventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
for _, handler := range e.OnDeleteHandlers {
handler(evt.Object)
}
}
// Generic implements EventHandler
func (e *EventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
package handler
package context
import (
"context"
......@@ -17,8 +17,8 @@ type Context struct {
Recorder record.EventRecorder
}
func NewContext(ctx context.Context, config *rest.Config, client client.Client, recorder record.EventRecorder, logger logr.Logger) *Context {
return &Context{
func NewContext(ctx context.Context, config *rest.Config, client client.Client, recorder record.EventRecorder, logger logr.Logger) Context {
return Context{
config: config,
ctx: ctx,
Client: client,
......
package handler
package context
import (
"time"
......
package handler
package context
import (
"fmt"
......
package handler
package context
import (
"context"
......@@ -115,7 +115,7 @@ func (c *Context) DetectRestart(job *div1alpha2.DIJob, pods []*corev1.Pod, alloc
return false
}
func (c *Context) markIncorrectJobFailed(obj client.Object) {
func (c *Context) MarkIncorrectJobFailed(obj client.Object) {
log := c.Log.WithName("markIncorrectJobFailed")
dclient, err := dynamic.NewForConfig(c.config)
if err != nil {
......
package handler
package context
import (
"fmt"
......@@ -23,7 +23,7 @@ var (
func OnTopologyHandler(job *v1alpha2.DIJob, rank int, pod *corev1.Pod) {
envs := make(map[string]string)
pworkers := int(job.Spec.ParallelWorkers)
pworkers := int(job.Spec.EngineFields.ParallelWorkers)
ports := make([]int, pworkers)
nodeIDs := make([]int, pworkers)
for i := 0; i < pworkers; i++ {
......@@ -35,7 +35,7 @@ func OnTopologyHandler(job *v1alpha2.DIJob, rank int, pod *corev1.Pod) {
return fmt.Sprintf("%s%s:%d", prefix, addr, port)
}
attachedNodesArgValue := ""
switch job.Spec.Topology {
switch job.Spec.EngineFields.Topology {
case v1alpha2.TopologyAlone:
// do nothing
case v1alpha2.TopologyStar:
......
......@@ -8,7 +8,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
"opendilab.org/di-orchestrator/pkg/handler"
dicontext "opendilab.org/di-orchestrator/pkg/context"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
......@@ -65,55 +65,55 @@ func (r *DIJobReconciler) reconcileWithJobStatus(ctx context.Context, job *div1a
case div1alpha2.JobPending:
if job.Spec.Preemptible {
if allocation != nil && len(pods) == 0 {
r.ctx.UpdateJobStatus(job, div1alpha2.JobStarting, handler.DIJobStartingReason, fmt.Sprintf("DIJob %s is starting now.", job.Name))
r.ctx.UpdateJobStatus(job, div1alpha2.JobStarting, dicontext.DIJobStartingReason, fmt.Sprintf("DIJob %s is starting now.", job.Name))
}
} else {
if replicas != 0 && len(pods) == 0 {
r.ctx.UpdateJobStatus(job, div1alpha2.JobStarting, handler.DIJobStartingReason, fmt.Sprintf("DIJob %s is starting now.", job.Name))
r.ctx.UpdateJobStatus(job, div1alpha2.JobStarting, dicontext.DIJobStartingReason, fmt.Sprintf("DIJob %s is starting now.", job.Name))
}
}
case div1alpha2.JobStarting:
if job.Spec.Preemptible {
if (diutil.CountPodsScheduled(pods) != replicas && r.ctx.DetectRestart(job, pods, allocation, replicas)) || allocation == nil {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, handler.DIJobRestartingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, dicontext.DIJobRestartingReason,
fmt.Sprintf("DIJob %s is restarting since conditions changed.", job.Name))
} else if allocation != nil && len(pods) == 0 {
if err := r.buildAndCreatePods(job, replicas, allocation); err != nil {
return err
}
} else if len(pods) != replicas {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, handler.DIJobRestartingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, dicontext.DIJobRestartingReason,
fmt.Sprintf("DIJob %s is restarting since the created pods %d are not matched replicas %d.", job.Name, len(pods), replicas))
} else if diutil.CountReadyPods(pods) == replicas {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRunning, handler.DIJobRunningReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRunning, dicontext.DIJobRunningReason,
fmt.Sprintf("DIJob %s is running since all pods are ready.", job.Name))
}
} else {
if diutil.CountPodsScheduled(pods) != replicas && r.ctx.DetectRestart(job, pods, allocation, replicas) {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, handler.DIJobRestartingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, dicontext.DIJobRestartingReason,
fmt.Sprintf("DIJob %s is restarting since conditions changed.", job.Name))
} else if replicas != 0 && len(pods) == 0 {
if err := r.buildAndCreatePods(job, replicas, allocation); err != nil {
return err
}
} else if len(pods) != replicas {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, handler.DIJobRestartingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, dicontext.DIJobRestartingReason,
fmt.Sprintf("DIJob %s is restarting since the created pods %d are not matched replicas %d.", job.Name, len(pods), replicas))
} else if diutil.CountReadyPods(pods) == replicas {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRunning, handler.DIJobRunningReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRunning, dicontext.DIJobRunningReason,
fmt.Sprintf("DIJob %s is running since all pods are ready.", job.Name))
}
}
case div1alpha2.JobRunning:
if r.ctx.DetectRestart(job, pods, allocation, replicas) || len(pods) != replicas {
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, handler.DIJobRestartingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobRestarting, dicontext.DIJobRestartingReason,
fmt.Sprintf("DIJob %s is restarting since the created pods %d are not matched replicas %d.", job.Name, len(pods), replicas))
}
case div1alpha2.JobRestarting:
if len(pods) != 0 {
r.ctx.DeletePods(job, pods)
} else {
r.ctx.UpdateJobStatus(job, div1alpha2.JobPending, handler.DIJobPendingReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobPending, dicontext.DIJobPendingReason,
fmt.Sprintf("DIJob %s is pending since job restarted.", job.Name))
}
}
......@@ -131,7 +131,7 @@ func (r *DIJobReconciler) buildAndCreatePods(job *div1alpha2.DIJob, replicas int
err := r.ctx.CreatePod(job, pod)
if err != nil {
log.Error(err, "failed to create pod", "pod", pod.Name)
r.ctx.UpdateJobStatus(job, div1alpha2.JobFailed, handler.DIJobFailedReason,
r.ctx.UpdateJobStatus(job, div1alpha2.JobFailed, dicontext.DIJobFailedReason,
fmt.Sprintf("DIJob %s failed because pod %s failed to created.", job.Name, pod.Name))
return r.ctx.DeletePods(job, builtPods)
}
......
......@@ -18,27 +18,31 @@ package controllers
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
dihandler "opendilab.org/di-orchestrator/pkg/handler"
dihandler "opendilab.org/di-orchestrator/pkg/common/handler"
dicontext "opendilab.org/di-orchestrator/pkg/context"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
// DIJobReconciler reconciles a DIJob object
type DIJobReconciler struct {
Scheme *runtime.Scheme
ctx *dihandler.Context
ctx dicontext.Context
}
func NewDIJobReconciler(scheme *runtime.Scheme, ctx *dihandler.Context) *DIJobReconciler {
func NewDIJobReconciler(scheme *runtime.Scheme, ctx dicontext.Context) *DIJobReconciler {
return &DIJobReconciler{
Scheme: scheme,
ctx: ctx,
......@@ -108,8 +112,13 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&div1alpha2.DIJob{}).
Watches(
&source.Kind{Type: &div1alpha2.DIJob{}},
&dihandler.DIJobEventHandler{
Context: r.ctx,
&dihandler.EventHandler{
OnCreateHandlers: []func(obj client.Object){
r.onJobAdd,
},
OnDeleteHandlers: []func(obj client.Object){
r.onJobDelete,
},
},
builder.Predicates{},
).
......@@ -130,3 +139,35 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
).
Complete(r)
}
// addDIJob is the event handler responsible for handling job add events
func (r *DIJobReconciler) onJobAdd(obj client.Object) {
jobkey := diutil.NamespacedName(obj.GetNamespace(), obj.GetName())
log := r.ctx.Log.WithValues("dijob", jobkey)
job, ok := obj.(*div1alpha2.DIJob)
if !ok {
log.Error(fmt.Errorf("failed to convert object DIJob: %s", jobkey), "")
r.ctx.MarkIncorrectJobFailed(obj)
return
}
oldStatus := job.Status.DeepCopy()
// update job status
msg := fmt.Sprintf("DIJob %s created", job.Name)
if job.Status.Phase == "" {
r.ctx.UpdateJobStatus(job, div1alpha2.JobPending, dicontext.DIJobPendingReason, msg)
}
log.Info(fmt.Sprintf("DIJob %s created", jobkey))
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := r.ctx.UpdateDIJobStatusInCluster(job); err != nil {
log.Error(err, fmt.Sprintf("failed to update DIJob %s status", jobkey))
}
}
}
func (r *DIJobReconciler) onJobDelete(obj client.Object) {
jobkey := diutil.NamespacedName(obj.GetNamespace(), obj.GetName())
log := r.ctx.Log.WithValues("dijob", jobkey)
log.Info(fmt.Sprintf("DIJob %s deleted", jobkey))
}
......@@ -34,7 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
"opendilab.org/di-orchestrator/pkg/handler"
dicontext "opendilab.org/di-orchestrator/pkg/context"
//+kubebuilder:scaffold:imports
)
......@@ -90,7 +90,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())
ctx := handler.NewContext(context.Background(),
ctx := dicontext.NewContext(context.Background(),
cfg,
k8sManager.GetClient(),
k8sManager.GetEventRecorderFor("di-operator"),
......
package handler
import (
"fmt"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
type DIJobEventHandler struct {
Context *Context
}
// Create implements EventHandler
func (e *DIJobEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
e.Context.OnJobAddHandler(evt.Object)
}
// Update implements EventHandler
func (e *DIJobEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
}
// Delete implements EventHandler
func (e *DIJobEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
e.Context.OnJobDeleteHandler(evt.Object)
}
// Generic implements EventHandler
func (e *DIJobEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
// addDIJob is the event handler responsible for handling job add events
func (c *Context) OnJobAddHandler(obj client.Object) {
jobkey := diutil.NamespacedName(obj.GetNamespace(), obj.GetName())
log := c.Log.WithValues("dijob", jobkey)
job, ok := obj.(*div1alpha2.DIJob)
if !ok {
log.Error(fmt.Errorf("failed to convert object DIJob: %s", jobkey), "")
c.markIncorrectJobFailed(obj)
return
}
oldStatus := job.Status.DeepCopy()
// update job status
msg := fmt.Sprintf("DIJob %s created", job.Name)
if job.Status.Phase == "" {
c.UpdateJobStatus(job, div1alpha2.JobPending, DIJobPendingReason, msg)
}
log.Info(fmt.Sprintf("DIJob %s created", jobkey))
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := c.UpdateDIJobStatusInCluster(job); err != nil {
log.Error(err, fmt.Sprintf("failed to update DIJob %s status", jobkey))
}
}
}
func (c *Context) OnJobDeleteHandler(obj client.Object) {
jobkey := diutil.NamespacedName(obj.GetNamespace(), obj.GetName())
log := c.Log.WithValues("dijob", jobkey)
log.Info(fmt.Sprintf("DIJob %s deleted", jobkey))
}
package handler
import (
corev1 "k8s.io/api/core/v1"
div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2"
)
type JobStatusHandler func(job *div1alpha2.DIJob, pods []*corev1.Pod) error
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册