提交 1bc25fcd 编写于 作者: L liqingping

Merge branch 'feat/events' into 'develop'

Feat/events

See merge request platform/CloudNative4AI/cluster-lifecycle/nervex-operator!18
......@@ -9,13 +9,7 @@ rules:
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- list
- apiGroups:
- ""
resources:
- events
- pods
- services
verbs:
......@@ -26,6 +20,13 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- list
- apiGroups:
- nervex.sensetime.com
resources:
......
......@@ -25,6 +25,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
......@@ -41,12 +42,13 @@ type NerveXJobReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme
AGConfig string
Recorder record.EventRecorder
}
//+kubebuilder:rbac:groups=nervex.sensetime.com,resources=nervexjobs;aggregatorconfigs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=nervex.sensetime.com,resources=nervexjobs/status;aggregatorconfigs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=nervex.sensetime.com,resources=nervexjobs/finalizers;aggregatorconfigs/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=pods;services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods;services;events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list
// Reconcile is part of the main kubernetes reconciliation loop which aims to
......@@ -63,8 +65,8 @@ func (r *NerveXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// log.Info("reconcile nervexjob", "nervexjob", req.NamespacedName)
// get NerveXJob object
nvxJob := &nervexv1alpha1.NerveXJob{}
err := r.Get(ctx, req.NamespacedName, nvxJob)
job := &nervexv1alpha1.NerveXJob{}
err := r.Get(ctx, req.NamespacedName, job)
if err != nil {
if !errors.IsNotFound(err) {
log.Error(err, "failed to get NerveXJob", "job", req.NamespacedName)
......@@ -72,43 +74,50 @@ func (r *NerveXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil
}
jobStatus := nvxJob.Status.DeepCopy()
jobStatus := job.Status.DeepCopy()
// list pods of NerveXJob
pods, err := nervexutil.ListPods(ctx, r.Client, nvxJob)
pods, err := nervexutil.ListPods(ctx, r.Client, job)
if err != nil {
log.Error(err, "failed to list pods of NerveXJob", "job", req.NamespacedName)
return ctrl.Result{}, nil
}
// list services of NerveXJob
services, err := nervexutil.ListServices(ctx, r.Client, nvxJob)
services, err := nervexutil.ListServices(ctx, r.Client, job)
if err != nil {
log.Error(err, "failed to list services of NerveXJob", "job", req.NamespacedName)
return ctrl.Result{}, nil
}
// check the phase of NerveXJob
if isSucceeded(nvxJob) || isFailed(nvxJob) {
if err := r.deletePodsAndServices(ctx, nvxJob, pods, services); err != nil {
if isSucceeded(job) || isFailed(job) {
if err := r.deletePodsAndServices(ctx, job, pods, services); err != nil {
log.Error(err, "failed to delete pods and services of NerveXJob", "job", req.NamespacedName)
return ctrl.Result{}, nil
}
if isSucceeded(job) {
for rtype := range job.Status.ReplicaStatus {
job.Status.ReplicaStatus[rtype].Succeeded += job.Status.ReplicaStatus[rtype].Active
job.Status.ReplicaStatus[rtype].Active = 0
}
}
return ctrl.Result{}, nil
}
// initialize NerveXJob status
initializeNerveXJobReplicaStatus(nvxJob)
initializeNerveXJobReplicaStatus(job)
if err := r.reconcilePods(ctx, nvxJob, pods); err != nil {
if err := r.reconcilePods(ctx, job, pods); err != nil {
log.Error(err, "failed to reconcile pods", "job", req.NamespacedName)
return ctrl.Result{}, nil
}
// update status
defer func() {
if !apiequality.Semantic.DeepEqual(*jobStatus, nvxJob.Status) {
if err := r.updateNerveXJobStatus(ctx, nvxJob); err != nil {
if !apiequality.Semantic.DeepEqual(*jobStatus, job.Status) {
if err := r.updateNerveXJobStatusInCluster(ctx, job); err != nil {
log.Error(err, "failed to update NerveXJobStatus", "job", req.NamespacedName)
}
}
......@@ -124,7 +133,7 @@ func (r *NerveXJobReconciler) deletePodsAndServices(ctx context.Context, job *ne
// delete services of NerveXJob
for _, svc := range services {
if err := r.Delete(ctx, svc, &client.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := r.deleteService(ctx, job, svc); err != nil {
return err
}
}
......@@ -159,9 +168,9 @@ func (r *NerveXJobReconciler) deletePodsAndServices(ctx context.Context, job *ne
continue
}
msg := fmt.Sprintf("Delete pod %s of job %s/%s, since the CleanPodPolicy is %s", pod.Name, job.Namespace, job.Name, job.Spec.CleanPodPolicy)
msg := fmt.Sprintf("Delete pod %s of job %s/%s", pod.Name, job.Namespace, job.Name)
log.Info(msg)
if err := r.Delete(ctx, pod, &client.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := r.deletePod(ctx, job, pod); err != nil {
return err
}
}
......
......@@ -7,6 +7,7 @@ import (
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
......@@ -23,7 +24,7 @@ func (r *NerveXJobReconciler) reconcilePods(ctx context.Context, job *nervexv1al
// update NerveXJob status if coordinator and aggregator are created
if coordinator != nil && ag != nil {
if err := r.checkPodsStatus(ctx, job, collectors, learners, coordinator, ag); err != nil {
if err := r.updateNerveXJobStatus(ctx, job, collectors, learners, coordinator, ag); err != nil {
return err
}
} else {
......@@ -65,21 +66,22 @@ func (r *NerveXJobReconciler) reconcilePods(ctx context.Context, job *nervexv1al
nervexutil.SetPodEnv(agpod, envs)
if coordinator == nil {
if err := r.createPodAndService(ctx, coorpod, coorsvc); err != nil {
if err := r.createPodAndService(ctx, job, coorpod, coorsvc); err != nil {
return err
}
}
if ag == nil {
if err := r.createPodAndService(ctx, agpod, agsvc); err != nil {
if err := r.createPodAndService(ctx, job, agpod, agsvc); err != nil {
return err
}
}
// update job status
job.Status.Phase = nervexv1alpha1.JobCreated
msg := fmt.Sprintf("NerveXJob %s created", job.Name)
updateNerveXJobConditions(job, nervexv1alpha1.JobCreated, NerveXJobCreatedReason, msg)
if err := r.updateJobPhase(ctx, job, nervexv1alpha1.JobCreated, NerveXJobCreatedReason, msg); err != nil {
return err
}
}
return nil
}
......@@ -95,7 +97,6 @@ func (r *NerveXJobReconciler) checkPodsStatus(ctx context.Context, job *nervexv1
job.Status.Phase = nervexv1alpha1.JobRunning
msg := fmt.Sprintf("coordinator and aggregator of NerveXJob %s are running", job.Name)
updateNerveXJobConditions(job, nervexv1alpha1.JobRunning, NerveXJobRunningReason, msg)
} else if job.Status.ReplicaStatus[nervexv1alpha1.ReplicaTypeCoordinator].Failed > 0 {
job.Status.Phase = nervexv1alpha1.JobFailed
msg := fmt.Sprintf("NerveXJob %s failed because coordinator failed", job.Name)
......@@ -107,23 +108,66 @@ func (r *NerveXJobReconciler) checkPodsStatus(ctx context.Context, job *nervexv1
msg := fmt.Sprintf("NerveXJob %s succeeded because coordinator succeeded", job.Name)
log.Info(msg)
updateNerveXJobConditions(job, nervexv1alpha1.JobSucceeded, NerveXJobSucceededReason, msg)
}
return nil
}
func (r *NerveXJobReconciler) createPodAndService(ctx context.Context, pod *corev1.Pod, svc *corev1.Service) error {
log := r.Log.WithValues("nervexjob", nervexutil.NamespacedName(pod.Namespace, pod.Name))
func (r *NerveXJobReconciler) createPodAndService(ctx context.Context, job *nervexv1alpha1.NerveXJob, pod *corev1.Pod, svc *corev1.Service) error {
log := r.Log.WithValues("nervexjob", nervexutil.NamespacedName(job.Namespace, job.Name))
log.Info("create pod ", "pod name:", pod)
if err := r.createPod(ctx, job, pod); err != nil {
return err
}
log.Info("create service ", "service name:", svc)
if err := r.createService(ctx, job, svc); err != nil {
return err
}
return nil
}
func (r *NerveXJobReconciler) createPod(ctx context.Context, job *nervexv1alpha1.NerveXJob, pod *corev1.Pod) error {
if err := r.Create(ctx, pod, &client.CreateOptions{}); err != nil {
msg := fmt.Sprintf("Failed to create pod: %s error: %v", pod.Name, err)
r.Recorder.Eventf(job, corev1.EventTypeWarning, FailedCreateReason, msg)
return fmt.Errorf("failed to create pod: %v", err)
}
msg := fmt.Sprintf("Create pod: %s", pod.Name)
r.Recorder.Eventf(job, corev1.EventTypeNormal, SuccessfulCreateReason, msg)
return nil
}
log.Info("create service ", "service name:", svc)
if err := r.Create(ctx, svc, &client.CreateOptions{}); err != nil {
func (r *NerveXJobReconciler) createService(ctx context.Context, job *nervexv1alpha1.NerveXJob, service *corev1.Service) error {
if err := r.Create(ctx, service, &client.CreateOptions{}); err != nil {
msg := fmt.Sprintf("Failed to create service: %s error: %v", service.Name, err)
r.Recorder.Eventf(job, corev1.EventTypeWarning, FailedCreateReason, msg)
return fmt.Errorf("failed to create service: %v", err)
}
msg := fmt.Sprintf("Create service: %s", service.Name)
r.Recorder.Eventf(job, corev1.EventTypeNormal, SuccessfulCreateReason, msg)
return nil
}
func (r *NerveXJobReconciler) deletePod(ctx context.Context, job *nervexv1alpha1.NerveXJob, pod *corev1.Pod) error {
if err := r.Delete(ctx, pod, &client.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
msg := fmt.Sprintf("Failed to delete pod: %s error: %v", pod.Name, err)
r.Recorder.Eventf(job, corev1.EventTypeWarning, FailedDeleteReason, msg)
return fmt.Errorf("failed to delete pod: %v", err)
}
msg := fmt.Sprintf("Delete pod: %s", pod.Name)
r.Recorder.Eventf(job, corev1.EventTypeNormal, SuccessfulDeleteReason, msg)
return nil
}
func (r *NerveXJobReconciler) deleteService(ctx context.Context, job *nervexv1alpha1.NerveXJob, service *corev1.Service) error {
if err := r.Delete(ctx, service, &client.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
msg := fmt.Sprintf("Failed to delete service: %s error: %v", service.Name, err)
r.Recorder.Eventf(job, corev1.EventTypeWarning, FailedDeleteReason, msg)
return fmt.Errorf("failed to delete service: %v", err)
}
msg := fmt.Sprintf("Delete service: %s", service.Name)
r.Recorder.Eventf(job, corev1.EventTypeNormal, SuccessfulDeleteReason, msg)
return nil
}
......
......@@ -2,6 +2,7 @@ package controllers
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
......@@ -14,6 +15,11 @@ import (
)
const (
SuccessfulDeleteReason = "SuccessfulDelete"
FailedDeleteReason = "FailedDelete"
SuccessfulCreateReason = "SuccessfulCreate"
FailedCreateReason = "FailedCreate"
NerveXJobCreatedReason = "NerveXJobCreated"
NerveXJobRunningReason = "NerveXJobRunning"
NerveXJobFailedReason = "NerveXJobFailed"
......@@ -23,7 +29,34 @@ const (
statusUpdatedPauseDuration = 50 * time.Millisecond
)
func (r *NerveXJobReconciler) updateNerveXJobStatus(ctx context.Context, job *nervexv1alpha1.NerveXJob) error {
func (r *NerveXJobReconciler) updateNerveXJobStatus(ctx context.Context, job *nervexv1alpha1.NerveXJob,
collectors []*corev1.Pod, learners []*corev1.Pod, coordinator *corev1.Pod, aggregator *corev1.Pod) error {
// update replica status
updateReplicasStatues(job, collectors, learners, coordinator, aggregator)
if job.Status.ReplicaStatus[nervexv1alpha1.ReplicaTypeCoordinator].Active > 0 &&
job.Status.ReplicaStatus[nervexv1alpha1.ReplicaTypeAggregator].Active > 0 {
msg := fmt.Sprintf("coordinator and aggregator of NerveXJob %s are running", job.Name)
if err := r.updateJobPhase(ctx, job, nervexv1alpha1.JobRunning, NerveXJobRunningReason, msg); err != nil {
return err
}
} else if job.Status.ReplicaStatus[nervexv1alpha1.ReplicaTypeCoordinator].Failed > 0 {
msg := fmt.Sprintf("NerveXJob %s failed because coordinator failed", job.Name)
if err := r.updateJobPhase(ctx, job, nervexv1alpha1.JobFailed, NerveXJobFailedReason, msg); err != nil {
return err
}
} else if job.Status.ReplicaStatus[nervexv1alpha1.ReplicaTypeCoordinator].Succeeded > 0 {
msg := fmt.Sprintf("NerveXJob %s succeeded because coordinator succeeded", job.Name)
if err := r.updateJobPhase(ctx, job, nervexv1alpha1.JobSucceeded, NerveXJobSucceededReason, msg); err != nil {
return err
}
}
return nil
}
func (r *NerveXJobReconciler) updateNerveXJobStatusInCluster(ctx context.Context, job *nervexv1alpha1.NerveXJob) error {
var err error
for i := 0; i < statusUpdateRetries; i++ {
newJob := &nervexv1alpha1.NerveXJob{}
......@@ -40,6 +73,25 @@ func (r *NerveXJobReconciler) updateNerveXJobStatus(ctx context.Context, job *ne
return err
}
func (r *NerveXJobReconciler) updateJobPhase(
ctx context.Context, job *nervexv1alpha1.NerveXJob, phase nervexv1alpha1.Phase, reason string, msg string) error {
job.Status.Phase = phase
updateNerveXJobConditions(job, phase, reason, msg)
switch phase {
case nervexv1alpha1.JobCreated, nervexv1alpha1.JobRunning:
// ignore events when job are created or running
case nervexv1alpha1.JobFailed:
r.Recorder.Eventf(job, corev1.EventTypeWarning, reason, msg)
case nervexv1alpha1.JobSucceeded:
r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg)
default:
r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg)
}
return nil
}
func initializeNerveXJobReplicaStatus(job *nervexv1alpha1.NerveXJob) {
if job.Status.ReplicaStatus == nil {
job.Status.ReplicaStatus = make(map[nervexv1alpha1.ReplicaType]*nervexv1alpha1.ReplicaStatus)
......
......@@ -118,6 +118,7 @@ var _ = BeforeSuite(func() {
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("NerveXJob"),
AGConfig: key.String(),
Recorder: k8sManager.GetEventRecorderFor("nervex-operator"),
}).SetupWithManager(k8sManager)
Expect(err).NotTo(HaveOccurred())
......
......@@ -91,6 +91,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("NerveXJob"),
Scheme: mgr.GetScheme(),
AGConfig: agconfig,
Recorder: mgr.GetEventRecorderFor("nervex-operator"),
}
if err = reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NerveXJob")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册