diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d3e3847a17fce4a5a21d3e10e7d084ff61109f0..f0f44f71b168aa4ecf80ef0a32327f2db793d999 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -97,6 +97,7 @@ tag: - cloudnative4ai-group-runner-phoenix services: - registry.sensetime.com/cloudnative4ai/docker:19.03.8-dind + when: manual allow_failure: false dependencies: - build-release diff --git a/Makefile b/Makefile index c9aba964f657a7c21a1f94be0831a82118031fe3..ab7f30a430d75443f92ad452b1d1daf5671c97f7 100644 --- a/Makefile +++ b/Makefile @@ -85,10 +85,11 @@ lint: .PHONY: test test: ginkgo ## Run tests. - $(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./pkg/... - go tool cover -func=./pkg/controllers/coverage.out + # $(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./pkg/... + $(GINKGO) -cover -coverprofile=coverage.out ./pkg/... go tool cover -func=./pkg/server/coverage.out - go tool cover -func=./pkg/common/gpuallocator/coverage.out + go tool cover -func=./pkg/common/coverage.out + go tool cover -func=./pkg/controllers/coverage.out ##@ Build diff --git a/cmd/server/server.go b/cmd/server/server.go index 5842a4208f4f24f69793bc3091e223d801add8d0..6726d59bd16c02362337ddcc5d3d8de05ca81c7f 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -68,7 +68,7 @@ func NewCmdServer(genFlags cmdcommon.GenericFlags) *cobra.Command { Examples: # Start di-server with gpu allocation policy and bind address specified. - di-orchestrator server -p simple -b :8080 + di-orchestrator server -p :8080 -s :8081 `, Run: func(cmd *cobra.Command, args []string) { cobra.CheckErr(runCommand(cmd, o)) diff --git a/pkg/common/config_test.go b/pkg/common/config_test.go index 4bbe64486ad0df46e6cce5058ca6d15625a00eaa..6d150e550fc764b4e6072dc1bd6b1d96a1c70f49 100644 --- a/pkg/common/config_test.go +++ b/pkg/common/config_test.go @@ -1,6 +1,7 @@ package common import ( + "fmt" "os" "testing" @@ -14,16 +15,33 @@ func TestConfig(t *testing.T) { RegisterFailHandler(Fail) RunSpecsWithDefaultAndCustomReporters(t, - "Config Suite", + "CommonConfig Suite", []Reporter{printer.NewlineReporter{}}) } -var _ = Describe("GetDIJobDefaultResources", func() { - defaultResource := `{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}` - err := os.Setenv("DI_JOB_DEFAULT_RESOURCES", defaultResource) - Expect(err).NotTo(HaveOccurred()) - r, err := GetDIJobDefaultResources() - Expect(err).NotTo(HaveOccurred()) - Expect(r.Requests.Cpu().Equal(resource.MustParse("1"))).Should(BeTrue()) - Expect(r.Requests.Memory().Equal(resource.MustParse("2Gi"))).Should(BeTrue()) +var _ = Describe("Test common config", func() { + Context("Get DIJob default resources", func() { + It("returns the default resources", func() { + type testCase struct { + resource string + expectCPU string + expectMem string + } + testCases := []testCase{ + {resource: `{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}`, expectCPU: "1", expectMem: "2Gi"}, + {resource: `{"resources": {"requests": {"cpu": 2, "memory": "3Gi"}}}`, expectCPU: "2", expectMem: "3Gi"}, + {resource: "", expectCPU: "1", expectMem: "2Gi"}, + } + for i := range testCases { + c := testCases[i] + By(fmt.Sprintf("Create the %dth DIJob", i+1)) + err := os.Setenv("DI_JOB_DEFAULT_RESOURCES", c.resource) + Expect(err).NotTo(HaveOccurred()) + r, err := GetDIJobDefaultResources() + Expect(err).NotTo(HaveOccurred()) + Expect(r.Requests.Cpu().Equal(resource.MustParse(c.expectCPU))).Should(BeTrue()) + Expect(r.Requests.Memory().Equal(resource.MustParse(c.expectMem))).Should(BeTrue()) + } + }) + }) }) diff --git a/pkg/common/gpuallocator/gpu_allocator.go b/pkg/common/gpuallocator/gpu_allocator.go deleted file mode 100644 index b6673016872f56cb71aa37fd87faa4a42e18f183..0000000000000000000000000000000000000000 --- a/pkg/common/gpuallocator/gpu_allocator.go +++ /dev/null @@ -1,132 +0,0 @@ -package gpuallocator - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" -) - -type GPUAllocator struct { - Nodes []*corev1.Node - - policy Policy -} - -func NewSimpleGPUAllocator(nodes []*corev1.Node) *GPUAllocator { - return &GPUAllocator{Nodes: nodes, policy: NewSimplePolicy()} -} - -func (g *GPUAllocator) Allocate(gpus int) []int { - return g.policy.Allocate(g.Nodes, gpus) -} - -func (g *GPUAllocator) NumGPUsOfMajorityNodeType() int { - return GetGPUsMajority(g.Nodes) -} - -const ( - SimpleGPUAllocPolicy = "simple" -) - -type Policy interface { - Allocate(nodes []*corev1.Node, gpus int) []int -} - -type SimplePolicy struct{} - -func NewSimplePolicy() *SimplePolicy { - return &SimplePolicy{} -} - -func (s *SimplePolicy) Allocate(nodes []*corev1.Node, gpus int) []int { - // gpusMajority is the node gpus with most frequent occurrence. - // maxGPUCount is the number of nodes with gpus equal to gpusMajority - gpusMajority := GetGPUsMajority(nodes) - if gpusMajority <= 0 { - return nil - } - perNodeGPUs := Max(gpusMajority, 1) - - if gpus < perNodeGPUs { - return []int{gpus} - } - - var result []int - nResults := gpus / perNodeGPUs - for i := 0; i < nResults; i++ { - result = append(result, perNodeGPUs) - } - remainGPUs := gpus - nResults*perNodeGPUs - if remainGPUs > 0 { - result = append(result, remainGPUs) - } - return result -} - -func Max(x, y int) int { - if x < y { - return y - } - return x -} - -func MaxInArray(v []int) (int, error) { - if len(v) == 0 { - return 0, fmt.Errorf("empty list") - } - max := v[0] - for _, i := range v { - if i > max { - max = i - } - } - return max, nil -} - -func GetGPUsMajority(nodes []*corev1.Node) int { - var nodeGPUCounts []int - for _, node := range nodes { - allocGPUs := node.Status.Allocatable[corev1.ResourceName("nvidia.com/gpu")] - nodeGPUCounts = append(nodeGPUCounts, int(allocGPUs.Value())) - } - - // gpusMajority is the number of gpus of majority nodes. - // majorityNodes is the number of nodes with gpus equal to gpusMajority - gpusMajority, _ := ValueOccursMostFrequentInList(nodeGPUCounts) - - if gpusMajority == 0 { - max, _ := MaxInArray(nodeGPUCounts) - return max - } - return gpusMajority -} - -// ValueOccursMostFrequentInList returns value that occurs most frequently in list, -// and the count of occurrences. -func ValueOccursMostFrequentInList(list []int) (int, int) { - if len(list) == 0 { - return 0, 0 - } - - // map the occurrence frequency of each value - maxCount := 0 - maxCountValue := 0 - valuesMap := make(map[int]int) - - for _, v := range list { - if valuesMap[v] != 0 { - valuesMap[v]++ - } else { - valuesMap[v] = 1 - } - - if maxCount < valuesMap[v] { - maxCount = valuesMap[v] - maxCountValue = v - } else if maxCount == valuesMap[v] && maxCountValue < v { - maxCountValue = v - } - } - - return maxCountValue, maxCount -} diff --git a/pkg/common/gpuallocator/gpu_allocator_test.go b/pkg/common/gpuallocator/gpu_allocator_test.go deleted file mode 100644 index 36e637a9e1127522b838313b2c7311fa24cfc76e..0000000000000000000000000000000000000000 --- a/pkg/common/gpuallocator/gpu_allocator_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package gpuallocator - -import ( - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "sigs.k8s.io/controller-runtime/pkg/envtest/printer" -) - -func TestAllocators(t *testing.T) { - RegisterFailHandler(Fail) - - RunSpecsWithDefaultAndCustomReporters(t, - "Allocator Suite", - []Reporter{printer.NewlineReporter{}}) -} - -var _ = Describe("Test SimpleGPUAllocator", func() { - It("ValueOccursMostFrequentInList function", func() { - testCases := map[string]struct { - list []int - expectedValue int - expectedCount int - }{ - "Only one max value": { - []int{1, 2, 3, 4, 5, 2, 2, 4, 6, 2, 3, 3, 1}, - 2, 4, - }, - "Multi max value": { - []int{1, 2, 3, 4, 5, 2, 2, 4, 6, 2, 3, 3, 1, 3}, - 3, 4, - }, - "Multi max value second": { - []int{1, 3, 3, 4, 5, 2, 2, 4, 6, 2, 3, 2, 1, 3}, - 3, 4, - }, - } - for _, test := range testCases { - maxValue, maxCount := ValueOccursMostFrequentInList(test.list) - Expect(maxValue).To(Equal(test.expectedValue)) - Expect(maxCount).To(Equal(test.expectedCount)) - } - }) - - It("Allocate function", func() { - testCases := map[string]struct { - nodeGPUs map[int]int - gpus int - result []int - }{ - "Only one max value with 12 gpus request": { - map[int]int{ - 8: 4, - 10: 3, - 6: 3, - }, - 12, []int{8, 4}, - }, - "Only one max value with 16 gpus request": { - map[int]int{ - 8: 4, - 10: 3, - 6: 3, - }, - 16, []int{8, 8}, - }, - "Multi max value with 16 gpus request": { - map[int]int{ - 8: 4, - 10: 4, - 6: 3, - }, - 16, []int{10, 6}, - }, - "Multi max value with 8 gpus request": { - map[int]int{ - 8: 4, - 10: 4, - 6: 3, - }, - 8, []int{8}, - }, - } - for _, test := range testCases { - var nodes []*corev1.Node - for nodeSpec, nodeGPUs := range test.nodeGPUs { - for i := 0; i < nodeGPUs; i++ { - nodes = append(nodes, newNode(nodeSpec)) - } - } - - alloc := NewSimpleGPUAllocator(nodes) - result := alloc.Allocate(test.gpus) - Expect(result).To(Equal(test.result)) - } - }) -}) - -func newNode(gpus int) *corev1.Node { - return &corev1.Node{ - Status: corev1.NodeStatus{ - Allocatable: corev1.ResourceList{ - "nvidia.com/gpu": *resource.NewQuantity(int64(gpus), resource.DecimalExponent), - }, - }, - } -} diff --git a/pkg/controllers/dijob_controller_test.go b/pkg/controllers/dijob_controller_test.go index 57958fffdbc49fd4655ab433bc1d6d577e6f0474..b66173ef760deb0e0f53d607a343319efc681430 100644 --- a/pkg/controllers/dijob_controller_test.go +++ b/pkg/controllers/dijob_controller_test.go @@ -1,406 +1,428 @@ package controllers -// import ( -// "context" -// "fmt" - -// . "github.com/onsi/ginkgo" -// . "github.com/onsi/gomega" -// corev1 "k8s.io/api/core/v1" -// "k8s.io/apimachinery/pkg/api/resource" -// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -// "k8s.io/apimachinery/pkg/types" -// "sigs.k8s.io/controller-runtime/pkg/client" - -// div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1" -// dicommon "opendilab.org/di-orchestrator/pkg/common" -// commontypes "opendilab.org/di-orchestrator/pkg/common/types" -// diutil "opendilab.org/di-orchestrator/pkg/utils" -// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" -// ) - -// var _ = Describe("DIJob Controller", func() { - -// Context("When creating a DIJob", func() { -// It("Should be succeeded", func() { -// By("Create a DIJob") -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) -// checkCoordinatorCreated(ctx, dijob) - -// By("Checking the created DIJob is in Created state") -// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobPending) - -// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") -// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} -// By("Update coordinator to Running") -// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodRunning) -// Expect(err).NotTo(HaveOccurred()) - -// By("Checking the created DIJob has enough coordinator") -// coorStatus := make([]int, 3) -// coorStatus[0] = 1 -// replicasStatuses := map[div2alpha1.ReplicaType][]int{ -// div2alpha1.ReplicaTypeCoordinator: coorStatus, -// } -// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - -// By("Checking the created DIJob is in Running state") -// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobRunning) - -// By("Update coordinator to Succeeded") -// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) -// Expect(err).NotTo(HaveOccurred()) - -// By("Checking the job is succeeded") -// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobSucceeded) - -// By("Checking the coordinator is succeeded") -// coorStatus = make([]int, 3) -// coorStatus[2] = 1 -// replicasStatuses = map[div2alpha1.ReplicaType][]int{ -// div2alpha1.ReplicaTypeCoordinator: coorStatus, -// } -// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - -// By("Cleaning up") -// err = testutil.CleanUpJob(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// }) - -// It("DIJob status changed with components status", func() { -// type testCase struct { -// coorStatus corev1.PodPhase -// expectStatus div2alpha1.Phase -// } -// testCases := []testCase{ -// {coorStatus: corev1.PodRunning, expectStatus: div2alpha1.JobRunning}, -// {coorStatus: corev1.PodFailed, expectStatus: div2alpha1.JobFailed}, -// {coorStatus: corev1.PodSucceeded, expectStatus: div2alpha1.JobSucceeded}, -// } -// for i := range testCases { -// c := testCases[i] - -// By(fmt.Sprintf("Create the %dth DIJob", i+1)) -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) -// checkCoordinatorCreated(ctx, dijob) - -// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") -// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} - -// By("Update coordinator status") -// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, c.coorStatus) -// Expect(err).NotTo(HaveOccurred()) - -// By("Checking the created DIJob has enough coordinator") -// coorStatus := make([]int, 3) -// switch c.coorStatus { -// case corev1.PodRunning: -// coorStatus[0] = 1 -// case corev1.PodFailed: -// coorStatus[1] = 1 -// case corev1.PodSucceeded: -// coorStatus[2] = 1 -// } -// replicasStatuses := map[div2alpha1.ReplicaType][]int{ -// div2alpha1.ReplicaTypeCoordinator: coorStatus, -// } -// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - -// By("Checking the created DIJob's state") -// checkDIJobPhase(ctx, k8sClient, jobKey, c.expectStatus) - -// By("Cleaning up") -// err = testutil.CleanUpJob(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// } -// }) - -// It("Should be marked as Created when submitted", func() { -// By("Create a DIJob") -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits = make(corev1.ResourceList) -// jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = -// resource.MustParse("1m") - -// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) - -// By("Checking the created DIJob is in Created state") -// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobPending) - -// By("Cleaning up") -// err = testutil.CleanUpJob(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// }) -// }) -// Context("When creating a DIJob with collectors and learners", func() { -// It("Should record collector and learner status to job status", func() { -// type replica struct { -// name string -// status corev1.PodPhase -// } -// type testCase struct { -// collectors []replica -// learners []replica -// } -// testCases := []testCase{ -// { -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodRunning}, -// }, -// }, -// { -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// {name: "job-collector-4tf", status: corev1.PodFailed}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodRunning}, -// }, -// }, -// { -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// {name: "job-collector-4tf", status: corev1.PodFailed}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodSucceeded}, -// {name: "job-learner-s4t", status: corev1.PodRunning}, -// }, -// }, -// } -// for i := range testCases { -// c := testCases[i] -// By(fmt.Sprintf("Create %dth DIJob", i+1)) -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) -// checkCoordinatorCreated(ctx, dijob) - -// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") -// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} - -// // build owner reference -// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true) - -// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name)) -// colStatus := make([]int, 3) -// for _, col := range c.collectors { -// createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus) -// } - -// lrStatus := make([]int, 3) -// for _, lr := range c.learners { -// createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus) -// } - -// By("Checking the ReplicaStatus is as expected") -// replicasStatuses := map[div2alpha1.ReplicaType][]int{ -// div2alpha1.ReplicaTypeCollector: colStatus, -// div2alpha1.ReplicaTypeLearner: lrStatus, -// } -// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - -// By("Checking the services are as expected") -// Eventually(func() int { -// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// return len(svcs) -// }, timeout, interval).Should(Equal(1 + len(c.collectors) + len(c.learners))) - -// By("Update coordinator to Succeeded") -// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) -// Expect(err).NotTo(HaveOccurred()) - -// By("Checking the job is successfully succeeded") -// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobSucceeded) - -// By("Checking the ReplicaStatus is as expected") -// coorStatus := make([]int, 3) -// coorStatus[2] = 1 - -// colFinishedStatus := make([]int, 3) -// lrFinishedStatus := make([]int, 3) -// colFinishedStatus[0] = 0 -// colFinishedStatus[1] = colStatus[1] -// colFinishedStatus[2] = colStatus[0] + colStatus[2] -// lrFinishedStatus[0] = 0 -// lrFinishedStatus[1] = lrStatus[1] -// lrFinishedStatus[2] = lrStatus[0] + lrStatus[2] - -// replicasStatuses = map[div2alpha1.ReplicaType][]int{ -// div2alpha1.ReplicaTypeCoordinator: coorStatus, -// div2alpha1.ReplicaTypeCollector: colFinishedStatus, -// div2alpha1.ReplicaTypeLearner: lrFinishedStatus, -// } -// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses) - -// err = testutil.CleanUpJob(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// } -// }) -// It("Should build right gpu ports and master port when the pod is ddp learner", func() { -// type replica struct { -// name string -// ddpLearnerType string -// gpus int -// expectedPorts int -// } - -// testCases := []replica{ -// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 4, expectedPorts: 5}, -// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeWorker, gpus: 6, expectedPorts: 6}, -// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 1, expectedPorts: 2}, -// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 0, expectedPorts: 2}, -// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeWorker, gpus: 0, expectedPorts: 1}, -// } -// for i := range testCases { -// c := testCases[i] -// By(fmt.Sprintf("Create %dth DIJob", i+1)) -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// dijob, _ := createDIJob(ctx, k8sClient, jobTmpl) -// checkCoordinatorCreated(ctx, dijob) - -// // build owner reference -// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true) - -// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name)) -// pod := buildPod(c.name, dijob.Name, dicommon.DDPLearnerName, ownRefer) -// pod.Labels[dicommon.DDPLearnerTypeLabel] = c.ddpLearnerType -// resources := commontypes.ResourceQuantity{GPU: resource.MustParse(fmt.Sprint(c.gpus))} -// diutil.SetPodResources(pod, resources) - -// err = k8sClient.Create(ctx, pod, &client.CreateOptions{}) -// Expect(err).NotTo(HaveOccurred()) - -// By("Checking the # of service's ports are as expected") -// Eventually(func() int { -// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) - -// _, _, _, _, DDPLearners, err := diutil.ClassifyServices(svcs) -// Expect(err).NotTo(HaveOccurred()) -// if len(DDPLearners) == 0 { -// return -1 -// } -// return len(DDPLearners[0].Spec.Ports) -// }, timeout, interval).Should(Equal(c.expectedPorts)) - -// err = testutil.CleanUpJob(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// } -// }) -// }) -// }) - -// func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div2alpha1.DIJob) ( -// div2alpha1.DIJob, types.NamespacedName) { -// name := diutil.GenerateName(dijob.Name) -// dijob.SetName(name) - -// err := k8sClient.Create(ctx, dijob, &client.CreateOptions{}) -// Expect(err).ShouldNot(HaveOccurred()) - -// By(fmt.Sprintf("Checking the DIJob %s is successfully created", name)) -// key := types.NamespacedName{Namespace: dijob.Namespace, Name: dijob.Name} -// createdDIjob := div2alpha1.DIJob{} -// Eventually(func() bool { -// err := k8sClient.Get(ctx, key, &createdDIjob) -// return err == nil -// }, timeout, interval).Should(BeTrue()) - -// return createdDIjob, key -// } - -// func checkCoordinatorCreated(ctx context.Context, dijob div2alpha1.DIJob) { -// By("Checking coordinator are created") -// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator") -// var pod corev1.Pod -// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} -// Eventually(func() bool { -// err := k8sClient.Get(ctx, podKey, &pod) -// return err == nil -// }, timeout, interval).Should(BeTrue()) -// } - -// func createAndUpdatePodPhase( -// ctx context.Context, k8sClient client.Client, -// name, jobName string, status corev1.PodPhase, replicaType string, -// ownRefer metav1.OwnerReference, statuses []int) { -// pod := buildPod(name, jobName, replicaType, ownRefer) -// createPodAndUpdatePhase(ctx, k8sClient, pod, status, statuses) -// } - -// func buildPod(name, jobName string, replicaType string, -// ownRefer metav1.OwnerReference) *corev1.Pod { -// pod := testutil.NewPod(name, jobName, ownRefer) -// labs := diutil.GenLabels(jobName) -// labs[dicommon.ReplicaTypeLabel] = replicaType -// labs[dicommon.PodNameLabel] = pod.Name -// pod.SetLabels(labs) - -// return pod -// } - -// func createPodAndUpdatePhase(ctx context.Context, k8sClient client.Client, -// pod *corev1.Pod, status corev1.PodPhase, statuses []int) { -// err := k8sClient.Create(ctx, pod, &client.CreateOptions{}) -// Expect(err).NotTo(HaveOccurred()) - -// podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} -// testutil.UpdatePodPhase(ctx, k8sClient, podKey, status) - -// switch status { -// case corev1.PodRunning: -// statuses[0]++ -// case corev1.PodFailed: -// statuses[1]++ -// case corev1.PodSucceeded: -// statuses[2]++ -// } -// } - -// func checkDIJobPhase(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, phase div2alpha1.Phase) { -// var dijob div2alpha1.DIJob -// Eventually(func() div2alpha1.Phase { -// err := k8sClient.Get(ctx, jobKey, &dijob) -// if err != nil { -// return div2alpha1.JobUnknown -// } -// return dijob.Status.Phase -// }, timeout, interval).Should(Equal(phase)) -// } - -// func checkReplicasStatuses(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, replicasStatuses map[div2alpha1.ReplicaType][]int) { -// for rtype, status := range replicasStatuses { -// var dijob div2alpha1.DIJob -// Eventually(func() []int { -// err := k8sClient.Get(ctx, jobKey, &dijob) -// if err != nil { -// return nil -// } -// if dijob.Status.ReplicaStatus == nil { -// return nil -// } - -// result := make([]int, 3) -// result[0] = int(dijob.Status.ReplicaStatus[rtype].Active) -// result[1] = int(dijob.Status.ReplicaStatus[rtype].Failed) -// result[2] = int(dijob.Status.ReplicaStatus[rtype].Succeeded) -// return result -// }, timeout, interval).Should(Equal(status)) -// } -// } +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1" + dicommon "opendilab.org/di-orchestrator/pkg/common" + dicontext "opendilab.org/di-orchestrator/pkg/context" + diutil "opendilab.org/di-orchestrator/pkg/utils" + testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" +) + +var _ = Describe("DIJob Controller", func() { + + Context("When creating a DIJob", func() { + It("Should be succeeded", func() { + By("Create a DIJob") + var err error + jobTmpl := testutil.NewDIJob() + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) + + By("Check the created DIJob is in Starting state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting) + + By("Update workers to Running") + for rank := 0; rank < int(job.Status.Replicas); rank++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodRunning) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking the created DIJob has enough ready replicas") + readyReplicas := int(job.Status.Replicas) + checkReadyReplicas(ctx, jobKey, readyReplicas) + + By("Checking the created DIJob is in Running state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobRunning) + + By("Update workers to Succeeded") + for rank := 0; rank < int(job.Status.Replicas); rank++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodSucceeded) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking the job is succeeded") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobSucceeded) + + By("Checking there is not ready replicas") + readyReplicas = 0 + checkReadyReplicas(ctx, jobKey, readyReplicas) + + By("Cleaning up") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + }) + + It("DIJob status changed with worker status", func() { + type testCase struct { + workerStatus corev1.PodPhase + expectStatus div2alpha1.Phase + } + testCases := []testCase{ + {workerStatus: corev1.PodRunning, expectStatus: div2alpha1.JobRunning}, + {workerStatus: corev1.PodFailed, expectStatus: div2alpha1.JobFailed}, + {workerStatus: corev1.PodSucceeded, expectStatus: div2alpha1.JobSucceeded}, + } + for i := range testCases { + c := testCases[i] + + By(fmt.Sprintf("Create the %dth DIJob", i+1)) + var err error + jobTmpl := testutil.NewDIJob() + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) + + By("Update workers status") + for rank := 0; rank < int(job.Status.Replicas); rank++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + err = testutil.UpdatePodPhase(ctx, podKey, c.workerStatus) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking the created DIJob's state") + checkDIJobPhase(ctx, jobKey, c.expectStatus) + + By("Cleaning up") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + } + }) + + It("Should be marked as Pending when submitted", func() { + By("Create a DIJob") + var err error + jobTmpl := testutil.NewDIJob() + jobTmpl.Spec.Template.Spec.Containers[0].Resources.Limits = make(corev1.ResourceList) + jobTmpl.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = + resource.MustParse("1m") + + job, jobKey := createDIJob(ctx, jobTmpl) + + By("Checking the created DIJob is in Pending state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobPending) + + By("Cleaning up") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + }) + }) + Context("When DIJob is starting", func() { + It("Should be Restarting when condition meat", func() { + type testCase struct { + replicas int + scheduled int + ready int + missed bool + rescheduled bool + expectPhase div2alpha1.Phase + expectRestart int32 + } + + testCases := []testCase{ + {replicas: 4, missed: false, scheduled: 4, ready: 4, rescheduled: false, expectPhase: div2alpha1.JobRunning, expectRestart: 0}, + {replicas: 4, missed: true, scheduled: 3, ready: 3, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1}, + {replicas: 4, missed: false, scheduled: 3, ready: 3, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 0}, + {replicas: 4, missed: false, scheduled: 3, ready: 3, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 1}, + {replicas: 4, missed: true, scheduled: 0, ready: 0, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1}, + {replicas: 4, missed: true, scheduled: 0, ready: 0, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 2}, + } + for i := range testCases { + c := testCases[i] + By(fmt.Sprintf("Create the %dth DIJob", i+1)) + var err error + jobTmpl := testutil.NewDIJob() + jobTmpl.Spec.MinReplicas = int32(c.replicas) + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) + + if c.missed { + By("Delete missed replicas") + rank := c.replicas - 1 + pod, err := getReplicaPod(ctx, jobKey, &job, rank, false) + Expect(err).NotTo(HaveOccurred()) + + err = ctx.Delete(context.TODO(), pod, &client.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Wait until generation changed") + Eventually(func() int { + ctx.Get(context.TODO(), jobKey, &job) + return int(job.Status.Generation) + }, timeout, interval).Should(Equal(1)) + + By("Wait until operator recreate all replicas") + Eventually(func() int { + pods, _ := ctx.ListJobPods(&job) + return len(pods) + }, timeout, interval).Should(Equal(c.replicas)) + } + + recreated := true + By("Update scheduled replicas") + for j := 0; j < c.scheduled; j++ { + pod, err := getReplicaPod(ctx, jobKey, &job, j, recreated) + Expect(err).NotTo(HaveOccurred()) + + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + }) + err = ctx.Status().Update(context.TODO(), pod, &client.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + By("Update ready replicas") + for j := 0; j < c.ready; j++ { + pod, err := getReplicaPod(ctx, jobKey, &job, j, recreated) + Expect(err).NotTo(HaveOccurred()) + + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{ + Ready: true, + }) + err = ctx.Status().Update(context.TODO(), pod, &client.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + if c.rescheduled { + By("Mark pod rescheduled") + pod, err := getReplicaPod(ctx, jobKey, &job, 0, recreated) + Expect(err).NotTo(HaveOccurred()) + + pod.Annotations[dicommon.AnnotationReplicas] = fmt.Sprint(c.replicas + 1) + err = ctx.Update(context.TODO(), pod, &client.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Wait until generation changed") + Eventually(func() int32 { + ctx.Get(context.TODO(), jobKey, &job) + return job.Status.Generation + }, timeout, interval).Should(Equal(c.expectRestart)) + + By("Wait until operator recreate all replicas") + Eventually(func() int { + pods, _ := ctx.ListJobPods(&job) + return len(pods) + }, timeout, interval).Should(Equal(c.replicas)) + } + + By("Check the created DIJob is in expected phase") + checkDIJobPhase(ctx, jobKey, c.expectPhase) + + By("Check the restart count is as expected") + err = ctx.Get(context.TODO(), jobKey, &job) + Expect(err).NotTo(HaveOccurred()) + Expect(job.Status.Generation).Should(Equal(c.expectRestart)) + + By("Cleaning up") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + } + + }) + }) + + Context("When DIJob is Running", func() { + It("Should be Restarting when condition meat", func() { + type testCase struct { + replicas int + missed bool + rescheduled bool + expectPhase div2alpha1.Phase + expectRestart int32 + } + + testCases := []testCase{ + {replicas: 4, missed: true, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1}, + {replicas: 4, missed: true, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 2}, + {replicas: 3, missed: false, rescheduled: false, expectPhase: div2alpha1.JobRunning, expectRestart: 0}, + {replicas: 4, missed: false, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 1}, + } + for i := range testCases { + c := testCases[i] + By(fmt.Sprintf("Create the %dth DIJob", i+1)) + var err error + jobTmpl := testutil.NewDIJob() + jobTmpl.Spec.MinReplicas = int32(c.replicas) + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) + + By("Update workers to Running") + for rank := 0; rank < int(job.Status.Replicas); rank++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodRunning) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking the created DIJob is in Running state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobRunning) + + if c.missed { + By("Delete missed replicas") + rank := c.replicas - 1 + pod, err := getReplicaPod(ctx, jobKey, &job, rank, false) + Expect(err).NotTo(HaveOccurred()) + + err = ctx.Delete(context.TODO(), pod, &client.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Wait until generation changed") + Eventually(func() int { + ctx.Get(context.TODO(), jobKey, &job) + return int(job.Status.Generation) + }, timeout, interval).Should(Equal(1)) + + By("Wait until operator recreate all replicas") + Eventually(func() int { + pods, _ := ctx.ListJobPods(&job) + return len(pods) + }, timeout, interval).Should(Equal(c.replicas)) + } + + recreated := true + if c.rescheduled { + By("Mark pod rescheduled") + pod, err := getReplicaPod(ctx, jobKey, &job, 0, recreated) + Expect(err).NotTo(HaveOccurred()) + + pod.Annotations[dicommon.AnnotationReplicas] = fmt.Sprint(c.replicas + 1) + err = ctx.Update(context.TODO(), pod, &client.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Wait until generation changed") + Eventually(func() int32 { + ctx.Get(context.TODO(), jobKey, &job) + return job.Status.Generation + }, timeout, interval).Should(Equal(c.expectRestart)) + + By("Wait until operator recreate all replicas") + Eventually(func() int { + pods, _ := ctx.ListJobPods(&job) + return len(pods) + }, timeout, interval).Should(Equal(c.replicas)) + } + + By("Check the created DIJob is in expected phase") + checkDIJobPhase(ctx, jobKey, c.expectPhase) + + By("Check the restart count is as expected") + err = ctx.Get(context.TODO(), jobKey, &job) + Expect(err).NotTo(HaveOccurred()) + Expect(job.Status.Generation).Should(Equal(c.expectRestart)) + + By("Cleaning up") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + } + + }) + }) +}) + +func createAndUpdateReplicas(ctx dicontext.Context, jobTmpl *div2alpha1.DIJob) ( + div2alpha1.DIJob, types.NamespacedName) { + var err error + job, jobKey := createDIJob(ctx, jobTmpl) + + By("Sleep for a few time to wait for condition synced") + time.Sleep(100 * time.Millisecond) + + err = ctx.Get(context.TODO(), jobKey, &job) + Expect(err).NotTo(HaveOccurred()) + + By("Update status.replicas") + job.Status.Replicas = job.Spec.MinReplicas + err = ctx.Status().Update(context.TODO(), &job, &client.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Check replicas created") + checkWorkersCreated(ctx, job) + return job, jobKey +} + +func createDIJob(ctx dicontext.Context, job *div2alpha1.DIJob) ( + div2alpha1.DIJob, types.NamespacedName) { + name := diutil.GenerateName(job.Name) + job.SetName(name) + key := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} + createdDIjob := div2alpha1.DIJob{} + + err := ctx.Create(context.TODO(), job, &client.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By(fmt.Sprintf("Checking the DIJob %s is successfully created", name)) + Eventually(func() bool { + err := ctx.Get(context.TODO(), key, &createdDIjob) + return err == nil + }, timeout, interval).Should(BeTrue()) + + return createdDIjob, key +} + +func checkWorkersCreated(ctx dicontext.Context, job div2alpha1.DIJob) { + Eventually(func() bool { + for i := 0; i < int(job.Spec.MinReplicas); i++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), i) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + var pod corev1.Pod + if err := ctx.Get(context.TODO(), podKey, &pod); err != nil { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + +} + +func getReplicaPod(ctx dicontext.Context, jobKey types.NamespacedName, job *div2alpha1.DIJob, rank int, recreated bool) (*corev1.Pod, error) { + time.Sleep(10 * time.Millisecond) + var err error + err = ctx.Get(context.TODO(), jobKey, job) + if err != nil { + return nil, err + } + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + if !recreated { + replicaName = diutil.ReplicaName(job.Name, 0, rank) + } + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + var pod corev1.Pod + err = ctx.Get(context.TODO(), podKey, &pod) + if err != nil { + return nil, err + } + return &pod, nil +} + +func checkDIJobPhase(ctx dicontext.Context, jobKey types.NamespacedName, phase div2alpha1.Phase) { + var job div2alpha1.DIJob + Eventually(func() div2alpha1.Phase { + err := ctx.Get(context.TODO(), jobKey, &job) + if err != nil { + return div2alpha1.JobUnknown + } + return job.Status.Phase + }, timeout, interval).Should(Equal(phase)) +} + +func checkReadyReplicas(ctx dicontext.Context, jobKey types.NamespacedName, readyReplicas int) { + var job div2alpha1.DIJob + Eventually(func() int { + err := ctx.Get(context.TODO(), jobKey, &job) + if err != nil { + return -1 + } + return int(job.Status.ReadyReplicas) + }, timeout, interval).Should(Equal(readyReplicas)) +} diff --git a/pkg/controllers/dijob_test.go b/pkg/controllers/dijob_test.go index 7b7408adffcc11813ecdfa8418f1dafea12e074f..63f9172e80a70ba40fa7a2b93879a3d42686bf17 100644 --- a/pkg/controllers/dijob_test.go +++ b/pkg/controllers/dijob_test.go @@ -1,174 +1,223 @@ package controllers -// import ( -// "context" -// "fmt" +import ( + "fmt" + "strings" -// . "github.com/onsi/ginkgo" -// . "github.com/onsi/gomega" -// corev1 "k8s.io/api/core/v1" -// "k8s.io/apimachinery/pkg/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" -// div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1" -// dicommon "opendilab.org/di-orchestrator/pkg/common" -// diutil "opendilab.org/di-orchestrator/pkg/utils" -// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" -// ) + div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1" + dicommon "opendilab.org/di-orchestrator/pkg/common" + diutil "opendilab.org/di-orchestrator/pkg/utils" + testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" +) -// var _ = Describe("DIJob Specification", func() { +var _ = Describe("DIJob Specification", func() { -// Context("When creating a DIJob with different CleanPodPolicy", func() { -// It("Should execute different pods deletion policy with different CleanPodPolicy", func() { -// cleanPodPolicies := []div2alpha1.CleanPodPolicy{ -// div2alpha1.CleanPodPolicyAll, -// div2alpha1.CleanPodPolicyRunning, -// div2alpha1.CleanPodPolicyNone, -// } -// for _, policy := range cleanPodPolicies { -// type replica struct { -// name string -// status corev1.PodPhase -// } -// type testCase struct { -// runnings int -// collectors []replica -// learners []replica -// } -// testCases := []testCase{ -// { -// runnings: 2, -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodRunning}, -// }, -// }, -// { -// runnings: 2, -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// {name: "job-collector-4tf", status: corev1.PodFailed}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodRunning}, -// }, -// }, -// { -// runnings: 2, -// collectors: []replica{ -// {name: "job-collector-sdf", status: corev1.PodRunning}, -// {name: "job-collector-4tf", status: corev1.PodFailed}, -// }, -// learners: []replica{ -// {name: "job-learner-sdf", status: corev1.PodSucceeded}, -// {name: "job-learner-s4t", status: corev1.PodRunning}, -// }, -// }, -// } -// for i := range testCases { -// c := testCases[i] -// By(fmt.Sprintf("Create %dth DIJob", i+1)) -// var err error -// ctx := context.Background() -// jobTmpl := testutil.NewDIJob() -// jobTmpl.Spec.CleanPodPolicy = policy -// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl) -// checkCoordinatorCreated(ctx, dijob) + Context("Test DIJob Specification", func() { + It("Should execute different pods deletion policy with different CleanPodPolicy", func() { + cleanPodPolicies := []div2alpha1.CleanPodPolicy{ + div2alpha1.CleanPodPolicyAll, + div2alpha1.CleanPodPolicyRunning, + div2alpha1.CleanPodPolicyNone, + } + for _, policy := range cleanPodPolicies { + type testCase struct { + runnings int // pending pods are also considered as running pods + replicaStatues []corev1.PodPhase + } + testCases := []testCase{ + { + runnings: 2, + replicaStatues: []corev1.PodPhase{ + corev1.PodRunning, corev1.PodRunning, corev1.PodFailed, corev1.PodSucceeded, + }, + }, + { + runnings: 0, + replicaStatues: []corev1.PodPhase{ + corev1.PodFailed, corev1.PodSucceeded, corev1.PodFailed, + }, + }, + { + runnings: 3, + replicaStatues: []corev1.PodPhase{ + corev1.PodPending, corev1.PodRunning, corev1.PodFailed, corev1.PodRunning, + }, + }, + { + runnings: 0, + replicaStatues: []corev1.PodPhase{ + corev1.PodFailed, corev1.PodFailed, corev1.PodFailed, + }, + }, + { + runnings: 0, + replicaStatues: []corev1.PodPhase{ + corev1.PodSucceeded, corev1.PodSucceeded, corev1.PodSucceeded, + }, + }, + } + for i := range testCases { + c := testCases[i] + By(fmt.Sprintf("Create %dth DIJob", i+1)) + var err error + jobTmpl := testutil.NewDIJob() + jobTmpl.Spec.MinReplicas = int32(len(c.replicaStatues)) + jobTmpl.Spec.CleanPodPolicy = policy + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) -// // build owner reference -// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true) -// By(fmt.Sprintf("ownRefer: %s %s", ownRefer.APIVersion, ownRefer.Kind)) -// colStatus := make([]int, 3) -// for _, col := range c.collectors { -// By(fmt.Sprintf("Create pod %s", col.name)) -// createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus) -// } + By("Check the created DIJob is in Starting state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting) -// lrStatus := make([]int, 3) -// for _, lr := range c.learners { -// By(fmt.Sprintf("Create pod %s", lr.name)) -// createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus) -// } + By("Update workers status") + for rank := 0; rank < len(c.replicaStatues); rank++ { + replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank) + podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName} + err = testutil.UpdatePodPhase(ctx, podKey, c.replicaStatues[rank]) + Expect(err).NotTo(HaveOccurred()) + } -// By("Get the number of pods") -// pods, err := diutil.ListPods(ctx, k8sClient, &dijob) -// Expect(err).NotTo(HaveOccurred()) -// npods := len(pods) + By("Get the number of pods") + pods, err := ctx.ListJobPods(&job) + Expect(err).NotTo(HaveOccurred()) + npods := len(pods) -// By("Update coordinator to Succeeded") -// for _, replicaName := range []string{ -// diutil.ReplicaPodName(dijob.Name, "coordinator"), -// } { -// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName} -// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded) -// Expect(err).NotTo(HaveOccurred()) -// } + By("Checking all the pods and services are deleted") -// By("Checking the job is succeeded") -// Eventually(func() div2alpha1.Phase { -// err := k8sClient.Get(ctx, jobKey, &dijob) -// if err != nil { -// return div2alpha1.JobUnknown -// } -// return dijob.Status.Phase -// }, timeout, interval).Should(Equal(div2alpha1.JobSucceeded)) + switch policy { + case div2alpha1.CleanPodPolicyAll: + Eventually(func() int { + pods, err := ctx.ListJobPods(&job) + if err != nil { + return -1 + } + return len(pods) + }, timeout, interval).Should(Equal(0)) + Eventually(func() int { + svcs, err := ctx.ListJobServices(&job) + if err != nil { + return -1 + } + return len(svcs) + }, timeout, interval).Should(Equal(0)) + case div2alpha1.CleanPodPolicyNone: + Consistently(func() int { + pods, err := ctx.ListJobPods(&job) + if err != nil { + return -1 + } + return len(pods) + }, duration, interval).Should(Equal(npods)) + Eventually(func() int { + svcs, err := ctx.ListJobServices(&job) + if err != nil { + return -1 + } + return len(svcs) + }, timeout, interval).Should(Equal(0)) + case div2alpha1.CleanPodPolicyRunning: + Eventually(func() int { + pods, err := ctx.ListJobPods(&job) + if err != nil { + return -1 + } + return len(pods) + }, timeout, interval).Should(Equal(npods - c.runnings)) + Eventually(func() int { + svcs, err := ctx.ListJobServices(&job) + if err != nil { + return -1 + } + return len(svcs) + }, timeout, interval).Should(Equal(0)) + } -// By("Checking all the pods and services are deleted") + By("Clean up pods") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + } + } + }) + It("Should create replicas with different connections relying on topology and parallel workers", func() { + type testCase struct { + topology div2alpha1.Topology + replicas int + paralleWorkers int + expectAttachedNodes int + } + testCases := []testCase{ + { + topology: div2alpha1.TopologyAlone, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0, + }, + { + topology: div2alpha1.TopologyAlone, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 0, + }, + { + topology: div2alpha1.TopologyStar, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0, + }, + { + topology: div2alpha1.TopologyStar, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 1, + }, + { + topology: div2alpha1.TopologyStar, replicas: 3, paralleWorkers: 3, expectAttachedNodes: 2, + }, + { + topology: div2alpha1.TopologyStar, replicas: 3, paralleWorkers: 4, expectAttachedNodes: 2, + }, + { + topology: div2alpha1.TopologyMesh, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0, + }, + { + topology: div2alpha1.TopologyMesh, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 3, + }, + { + topology: div2alpha1.TopologyMesh, replicas: 3, paralleWorkers: 3, expectAttachedNodes: 9, + }, + { + topology: div2alpha1.TopologyMesh, replicas: 3, paralleWorkers: 4, expectAttachedNodes: 12, + }, + } + for i := range testCases { + c := testCases[i] + By(fmt.Sprintf("Create %dth DIJob", i+1)) + var err error + jobTmpl := testutil.NewDIJob() + jobTmpl.Spec.MinReplicas = int32(c.replicas) + jobTmpl.Spec.EngineFields.ParallelWorkers = int32(c.paralleWorkers) + jobTmpl.Spec.EngineFields.Topology = c.topology + job, jobKey := createAndUpdateReplicas(ctx, jobTmpl) -// switch policy { -// case div2alpha1.CleanPodPolicyAll: -// Eventually(func() int { -// pods, err := diutil.ListPods(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(pods) -// }, timeout, interval).Should(Equal(0)) -// Eventually(func() int { -// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(svcs) -// }, timeout, interval).Should(Equal(0)) -// case div2alpha1.CleanPodPolicyNone: -// Consistently(func() int { -// pods, err := diutil.ListPods(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(pods) -// }, duration, interval).Should(Equal(npods)) -// Eventually(func() int { -// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(svcs) -// }, duration, interval).Should(Equal(0)) -// case div2alpha1.CleanPodPolicyRunning: -// Eventually(func() int { -// pods, err := diutil.ListPods(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(pods) -// }, duration, interval).Should(Equal(npods - c.runnings)) -// Eventually(func() int { -// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob) -// if err != nil { -// return -1 -// } -// return len(svcs) -// }, duration, interval).Should(Equal(0)) -// } + By("Check the created DIJob is in Starting state") + checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting) -// By("Clean up pods") -// err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy()) -// Expect(err).NotTo(HaveOccurred()) -// } -// } -// }) -// }) -// }) + By("Check workers' attached nodes are as expected") + Eventually(func() int { + pods, err := ctx.ListJobPods(&job) + if err != nil { + return -1 + } + attachedNodes := 0 + for _, pod := range pods { + for _, env := range pod.Spec.Containers[0].Env { + if env.Name == dicommon.ENVAttachedNodesArg { + if env.Value == "" { + continue + } + attachedNodes += len(strings.Split(env.Value, ",")) + } + } + } + return attachedNodes + }, timeout, interval).Should(Equal(c.expectAttachedNodes)) + + By("Clean up pods") + err = ctx.CleanUpJob(&job) + Expect(err).NotTo(HaveOccurred()) + } + }) + }) +}) diff --git a/pkg/controllers/suite_test.go b/pkg/controllers/suite_test.go index 73f7d5f9c0acda4a19616af9b1ed16daed7b30f6..de33986d22907648937abbc920e821f049c5a454 100644 --- a/pkg/controllers/suite_test.go +++ b/pkg/controllers/suite_test.go @@ -21,13 +21,13 @@ import ( "fmt" "path/filepath" "testing" + "time" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/config" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/envtest/printer" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -42,20 +42,20 @@ import ( // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. const ( -// timeout = 5 * time.Second -// interval = 250 * time.Millisecond -// duration = 200 * time.Millisecond + timeout = 5 * time.Second + interval = 250 * time.Millisecond + duration = 200 * time.Millisecond ) // var cfg *rest.Config -var k8sClient client.Client +var ctx dicontext.Context var testEnv *envtest.Environment func TestControllers(t *testing.T) { RegisterFailHandler(Fail) RunSpecsWithDefaultAndCustomReporters(t, - "Controller Suite", + "DI-Controller Suite", []Reporter{printer.NewlineReporter{}}) } @@ -77,10 +77,6 @@ var _ = BeforeSuite(func() { //+kubebuilder:scaffold:scheme - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) - // create controller manager metricPort := config.GinkgoConfig.ParallelNode + 8200 metricAddress := fmt.Sprintf(":%d", metricPort) @@ -90,11 +86,11 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - ctx := dicontext.NewContext(context.Background(), + ctx = dicontext.NewContext(context.Background(), cfg, k8sManager.GetClient(), - k8sManager.GetEventRecorderFor("di-operator"), - ctrl.Log.WithName("di-operator")) + k8sManager.GetEventRecorderFor("controller"), + ctrl.Log.WithName("controller")) reconciler := NewDIJobReconciler(k8sManager.GetScheme(), ctx) if err = reconciler.SetupWithManager(k8sManager); err != nil { Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/server/suite_test.go b/pkg/server/suite_test.go index 7a08afbc0df1b95d720103a1263b0a0d59b33759..16d9a55633dc2a86b3afecde2030a5fa9c75a160 100644 --- a/pkg/server/suite_test.go +++ b/pkg/server/suite_test.go @@ -68,7 +68,7 @@ func TestServer(t *testing.T) { RegisterFailHandler(Fail) RunSpecsWithDefaultAndCustomReporters(t, - "Server Suite", + "DI-Server Suite", []Reporter{printer.NewlineReporter{}}) } diff --git a/pkg/utils/testutils/dijob.go b/pkg/utils/testutils/dijob.go index 56d4defb6729c50e6596763ca512f895405d8851..d0aa23249c80c7a3bbe422bb475967236d6c9a11 100644 --- a/pkg/utils/testutils/dijob.go +++ b/pkg/utils/testutils/dijob.go @@ -19,6 +19,9 @@ func NewDIJob() *div2alpha1.DIJob { Namespace: DIJobNamespace, }, Spec: div2alpha1.DIJobSpec{ + MinReplicas: 1, + MaxReplicas: 4, + Preemptible: false, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/pkg/utils/testutils/pod.go b/pkg/utils/testutils/pod.go index 584f0af60b59fadbc218f7076a73d2abd60eed50..3319ab593f29e74ad5224d3de0a59949907ba637 100644 --- a/pkg/utils/testutils/pod.go +++ b/pkg/utils/testutils/pod.go @@ -15,9 +15,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" dicommon "opendilab.org/di-orchestrator/pkg/common" + dicontext "opendilab.org/di-orchestrator/pkg/context" ) -func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod { +func NewPod(name, namespace string, ownRefer metav1.OwnerReference) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -25,7 +26,7 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod { }, ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: DIJobNamespace, + Namespace: namespace, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ @@ -41,9 +42,9 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod { return pod } -func UpdatePodPhase(ctx context.Context, k8sClient client.Client, podKey types.NamespacedName, phase corev1.PodPhase) error { +func UpdatePodPhase(ctx dicontext.Context, podKey types.NamespacedName, phase corev1.PodPhase) error { var pod corev1.Pod - err := k8sClient.Get(ctx, podKey, &pod) + err := ctx.Get(context.TODO(), podKey, &pod) if err != nil { return err } @@ -54,11 +55,11 @@ func UpdatePodPhase(ctx context.Context, k8sClient client.Client, podKey types.N state := corev1.ContainerStateRunning{} cstatus := corev1.ContainerStatus{Name: containerName, State: corev1.ContainerState{ Running: &state, - }} + }, Ready: true} pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, cstatus) } - err = k8sClient.Status().Update(ctx, &pod, &client.UpdateOptions{}) + err = ctx.Status().Update(context.TODO(), &pod, &client.UpdateOptions{}) if err != nil { return err }