diff --git a/controllers/dijob_controller_test.go b/controllers/dijob_controller_test.go index 34e063412df91ffd0acfd766dfd70deb207004ab..84190975de8d4e5c3c90266f3485c2369cc58ae1 100644 --- a/controllers/dijob_controller_test.go +++ b/controllers/dijob_controller_test.go @@ -3,7 +3,6 @@ package controllers import ( "context" "fmt" - "strings" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -27,69 +26,41 @@ var _ = Describe("DIJob Controller", func() { ctx := context.Background() jobTmpl := testutil.NewDIJob() dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) + replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") + podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} By("Update coordinator to Running") - for _, replicaName := range []string{ - diutil.ReplicaPodName(dijob.Name, "coordinator"), - } { - podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} - err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodRunning) - Expect(err).NotTo(HaveOccurred()) - } + err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodRunning) + Expect(err).NotTo(HaveOccurred()) - var createdDIjob div1alpha1.DIJob By("Checking the created DIJob has enough coordinator") - for _, rtype := range []div1alpha1.ReplicaType{div1alpha1.ReplicaTypeCoordinator} { - Eventually(func() int { - err := k8sClient.Get(ctx, jobKey, &createdDIjob) - if err != nil { - return -1 - } - if createdDIjob.Status.ReplicaStatus == nil { - return -1 - } - return int(createdDIjob.Status.ReplicaStatus[rtype].Active) - }, timeout, interval).Should(Equal(1)) + coorStatus := make([]int, 3) + coorStatus[0] = 1 + replicasStatuses := map[div1alpha1.ReplicaType][]int{ + div1alpha1.ReplicaTypeCoordinator: coorStatus, } + checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) By("Checking the created DIJob is in Running state") - Eventually(func() bool { - err := k8sClient.Get(ctx, jobKey, &createdDIjob) - if err != nil { - return false - } - return createdDIjob.Status.Phase == div1alpha1.JobRunning - }, duration, interval).Should(BeTrue()) + checkDIJobPhase(ctx, k8sClient, jobKey, div1alpha1.JobRunning) By("Update coordinator to Succeeded") - for _, replicaName := range []string{ - diutil.ReplicaPodName(createdDIjob.Name, "coordinator"), - } { - podKey := types.NamespacedName{Namespace: createdDIjob.Namespace, Name: replicaName} - err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) - Expect(err).NotTo(HaveOccurred()) - } + err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) + Expect(err).NotTo(HaveOccurred()) By("Checking the job is succeeded") - Eventually(func() div1alpha1.Phase { - err := k8sClient.Get(ctx, jobKey, &createdDIjob) - if err != nil { - return div1alpha1.JobUnknown - } - return createdDIjob.Status.Phase - }, timeout, interval).Should(Equal(div1alpha1.JobSucceeded)) + checkDIJobPhase(ctx, k8sClient, jobKey, div1alpha1.JobSucceeded) By("Checking the coordinator is succeeded") - Eventually(func() int { - err := k8sClient.Get(ctx, jobKey, &createdDIjob) - if err != nil { - return -1 - } - return int(createdDIjob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Succeeded) - }, timeout, interval).Should(Equal(1)) + coorStatus = make([]int, 3) + coorStatus[2] = 1 + replicasStatuses = map[div1alpha1.ReplicaType][]int{ + div1alpha1.ReplicaTypeCoordinator: coorStatus, + } + checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) By("Cleaning up") - err = testutil.CleanUpJob(ctx, k8sClient, createdDIjob.DeepCopy()) + err = testutil.CleanUpJob(ctx, k8sClient, &dijob) Expect(err).NotTo(HaveOccurred()) }) @@ -111,53 +82,33 @@ var _ = Describe("DIJob Controller", func() { ctx := context.Background() jobTmpl := testutil.NewDIJob() dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) + replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") + podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} By("Update coordinator status") - for _, replicaName := range []string{ - diutil.ReplicaPodName(dijob.Name, "coordinator"), - } { - podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} - if strings.HasSuffix(replicaName, "coordinator") { - err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, c.coorStatus) - } - Expect(err).NotTo(HaveOccurred()) - } + err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, c.coorStatus) + Expect(err).NotTo(HaveOccurred()) By("Checking the created DIJob has enough coordinator") - Eventually(func() int { - err := k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return -1 - } - if dijob.Status.ReplicaStatus == nil { - return -1 - } - - // get phase - var phase corev1.PodPhase = c.coorStatus - count := 0 - switch phase { - case corev1.PodRunning: - count = int(dijob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Active) - case corev1.PodFailed: - count = int(dijob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Failed) - case corev1.PodSucceeded: - count = int(dijob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Succeeded) - } - return count - }, timeout, interval).Should(Equal(1)) + coorStatus := make([]int, 3) + switch c.coorStatus { + case corev1.PodRunning: + coorStatus[0] = 1 + case corev1.PodFailed: + coorStatus[1] = 1 + case corev1.PodSucceeded: + coorStatus[2] = 1 + } + replicasStatuses := map[div1alpha1.ReplicaType][]int{ + div1alpha1.ReplicaTypeCoordinator: coorStatus, + } + checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) By("Checking the created DIJob's state") - Eventually(func() div1alpha1.Phase { - err := k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return div1alpha1.JobUnknown - } - return dijob.Status.Phase - }, timeout, interval).Should(Equal(c.expectStatus)) + checkDIJobPhase(ctx, k8sClient, jobKey, c.expectStatus) By("Cleaning up") - err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy()) + err = testutil.CleanUpJob(ctx, k8sClient, &dijob) Expect(err).NotTo(HaveOccurred()) } }) @@ -208,6 +159,8 @@ var _ = Describe("DIJob Controller", func() { ctx := context.Background() jobTmpl := testutil.NewDIJob() dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) + replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") + podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} // build owner reference ownRefer := diutil.NewOwnerReference(div1alpha1.GroupVersion.String(), div1alpha1.KindDIJob, dijob.Name, dijob.UID, true) @@ -224,92 +177,47 @@ var _ = Describe("DIJob Controller", func() { } By("Checking the ReplicaStatus is as expected") - for _, rtype := range []div1alpha1.ReplicaType{ - div1alpha1.ReplicaTypeCollector, - div1alpha1.ReplicaTypeLearner, - } { - var status []int - switch rtype { - case div1alpha1.ReplicaTypeCollector: - status = colStatus - case div1alpha1.ReplicaTypeLearner: - status = lrStatus - } - Eventually(func() []int { - err = k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return nil - } - result := make([]int, 3) - result[0] = int(dijob.Status.ReplicaStatus[rtype].Active) - result[1] = int(dijob.Status.ReplicaStatus[rtype].Failed) - result[2] = int(dijob.Status.ReplicaStatus[rtype].Succeeded) - return result - }, timeout, interval).Should(Equal(status)) + replicasStatuses := map[div1alpha1.ReplicaType][]int{ + div1alpha1.ReplicaTypeCollector: colStatus, + div1alpha1.ReplicaTypeLearner: lrStatus, } + checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - By("Update coordinator to Succeeded") - for _, replicaName := range []string{ - diutil.ReplicaPodName(dijob.Name, "coordinator"), - } { - podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} - err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) + By("Checking the services are as expected") + Eventually(func() int { + svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) Expect(err).NotTo(HaveOccurred()) - } + return len(svcs) + }, timeout, interval).Should(Equal(1 + len(c.collectors) + len(c.learners))) + + By("Update coordinator to Succeeded") + err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) + Expect(err).NotTo(HaveOccurred()) By("Checking the job is successfully succeeded") - Eventually(func() div1alpha1.Phase { - err := k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return div1alpha1.JobUnknown - } - return dijob.Status.Phase - }, timeout, interval).Should(Equal(div1alpha1.JobSucceeded)) - - By("Checking the coordinator is succeeded") - Eventually(func() int { - err := k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return -1 - } - return int(dijob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Succeeded) - }, timeout, interval).Should(Equal(1)) - - colStatus1 := make([]int, 3) - lrStatus1 := make([]int, 3) - colStatus1[0] = 0 - colStatus1[1] = colStatus[1] - colStatus1[2] = colStatus[0] + colStatus[2] - lrStatus1[0] = 0 - lrStatus1[1] = lrStatus[1] - lrStatus1[2] = lrStatus[0] + lrStatus[2] + checkDIJobPhase(ctx, k8sClient, jobKey, div1alpha1.JobSucceeded) By("Checking the ReplicaStatus is as expected") - for _, rtype := range []div1alpha1.ReplicaType{ - div1alpha1.ReplicaTypeCollector, - div1alpha1.ReplicaTypeLearner, - } { - var status []int - switch rtype { - case div1alpha1.ReplicaTypeCollector: - status = colStatus1 - case div1alpha1.ReplicaTypeLearner: - status = lrStatus1 - } - Eventually(func() []int { - err = k8sClient.Get(ctx, jobKey, &dijob) - if err != nil { - return nil - } - result := make([]int, 3) - result[0] = int(dijob.Status.ReplicaStatus[rtype].Active) - result[1] = int(dijob.Status.ReplicaStatus[rtype].Failed) - result[2] = int(dijob.Status.ReplicaStatus[rtype].Succeeded) - return result - }, timeout, interval).Should(Equal(status)) + coorStatus := make([]int, 3) + coorStatus[2] = 1 + + colFinishedStatus := make([]int, 3) + lrFinishedStatus := make([]int, 3) + colFinishedStatus[0] = 0 + colFinishedStatus[1] = colStatus[1] + colFinishedStatus[2] = colStatus[0] + colStatus[2] + lrFinishedStatus[0] = 0 + lrFinishedStatus[1] = lrStatus[1] + lrFinishedStatus[2] = lrStatus[0] + lrStatus[2] + + replicasStatuses = map[div1alpha1.ReplicaType][]int{ + div1alpha1.ReplicaTypeCoordinator: coorStatus, + div1alpha1.ReplicaTypeCollector: colFinishedStatus, + div1alpha1.ReplicaTypeLearner: lrFinishedStatus, } + checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy()) + err = testutil.CleanUpJob(ctx, k8sClient, &dijob) Expect(err).NotTo(HaveOccurred()) } }) @@ -329,10 +237,7 @@ func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div1alpha1 createdDIjob := div1alpha1.DIJob{} Eventually(func() bool { err := k8sClient.Get(ctx, key, &createdDIjob) - if err != nil { - return false - } - return true + return err == nil }, timeout, interval).Should(BeTrue()) By("Checking coordinator are created") @@ -341,10 +246,7 @@ func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div1alpha1 podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} Eventually(func() bool { err = k8sClient.Get(ctx, podKey, &pod) - if err != nil { - return false - } - return true + return err == nil }, timeout, interval).Should(BeTrue()) return createdDIjob, key @@ -376,3 +278,35 @@ func createAndUpdatePodPhase( statuses[2]++ } } + +func checkDIJobPhase(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, phase div1alpha1.Phase) { + var dijob div1alpha1.DIJob + Eventually(func() div1alpha1.Phase { + err := k8sClient.Get(ctx, jobKey, &dijob) + if err != nil { + return div1alpha1.JobUnknown + } + return dijob.Status.Phase + }, timeout, interval).Should(Equal(phase)) +} + +func checkReplicasStatuses(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, replicasStatuses map[div1alpha1.ReplicaType][]int) { + for rtype, status := range replicasStatuses { + var dijob div1alpha1.DIJob + Eventually(func() []int { + err := k8sClient.Get(ctx, jobKey, &dijob) + if err != nil { + return nil + } + if dijob.Status.ReplicaStatus == nil { + return nil + } + + result := make([]int, 3) + result[0] = int(dijob.Status.ReplicaStatus[rtype].Active) + result[1] = int(dijob.Status.ReplicaStatus[rtype].Failed) + result[2] = int(dijob.Status.ReplicaStatus[rtype].Succeeded) + return result + }, timeout, interval).Should(Equal(status)) + } +}