提交 b84bbe46 编写于 作者: L liqingping

Merge branch 'develop' into 'master'

Develop

See merge request platform/CloudNative4AI/cluster-lifecycle/di-orchestrator!63
......@@ -97,6 +97,7 @@ tag:
- cloudnative4ai-group-runner-phoenix
services:
- registry.sensetime.com/cloudnative4ai/docker:19.03.8-dind
when: manual
allow_failure: false
dependencies:
- build-release
......
......@@ -85,10 +85,11 @@ lint:
.PHONY: test
test: ginkgo ## Run tests.
$(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./pkg/...
go tool cover -func=./pkg/controllers/coverage.out
# $(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./pkg/...
$(GINKGO) -cover -coverprofile=coverage.out ./pkg/...
go tool cover -func=./pkg/server/coverage.out
go tool cover -func=./pkg/common/gpuallocator/coverage.out
go tool cover -func=./pkg/common/coverage.out
go tool cover -func=./pkg/controllers/coverage.out
##@ Build
......
......@@ -68,7 +68,7 @@ func NewCmdServer(genFlags cmdcommon.GenericFlags) *cobra.Command {
Examples:
# Start di-server with gpu allocation policy and bind address specified.
di-orchestrator server -p simple -b :8080
di-orchestrator server -p :8080 -s :8081
`,
Run: func(cmd *cobra.Command, args []string) {
cobra.CheckErr(runCommand(cmd, o))
......
package common
import (
"fmt"
"os"
"testing"
......@@ -14,16 +15,33 @@ func TestConfig(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"Config Suite",
"CommonConfig Suite",
[]Reporter{printer.NewlineReporter{}})
}
var _ = Describe("GetDIJobDefaultResources", func() {
defaultResource := `{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}`
err := os.Setenv("DI_JOB_DEFAULT_RESOURCES", defaultResource)
Expect(err).NotTo(HaveOccurred())
r, err := GetDIJobDefaultResources()
Expect(err).NotTo(HaveOccurred())
Expect(r.Requests.Cpu().Equal(resource.MustParse("1"))).Should(BeTrue())
Expect(r.Requests.Memory().Equal(resource.MustParse("2Gi"))).Should(BeTrue())
var _ = Describe("Test common config", func() {
Context("Get DIJob default resources", func() {
It("returns the default resources", func() {
type testCase struct {
resource string
expectCPU string
expectMem string
}
testCases := []testCase{
{resource: `{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}`, expectCPU: "1", expectMem: "2Gi"},
{resource: `{"resources": {"requests": {"cpu": 2, "memory": "3Gi"}}}`, expectCPU: "2", expectMem: "3Gi"},
{resource: "", expectCPU: "1", expectMem: "2Gi"},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create the %dth DIJob", i+1))
err := os.Setenv("DI_JOB_DEFAULT_RESOURCES", c.resource)
Expect(err).NotTo(HaveOccurred())
r, err := GetDIJobDefaultResources()
Expect(err).NotTo(HaveOccurred())
Expect(r.Requests.Cpu().Equal(resource.MustParse(c.expectCPU))).Should(BeTrue())
Expect(r.Requests.Memory().Equal(resource.MustParse(c.expectMem))).Should(BeTrue())
}
})
})
})
package gpuallocator
import (
"fmt"
corev1 "k8s.io/api/core/v1"
)
type GPUAllocator struct {
Nodes []*corev1.Node
policy Policy
}
func NewSimpleGPUAllocator(nodes []*corev1.Node) *GPUAllocator {
return &GPUAllocator{Nodes: nodes, policy: NewSimplePolicy()}
}
func (g *GPUAllocator) Allocate(gpus int) []int {
return g.policy.Allocate(g.Nodes, gpus)
}
func (g *GPUAllocator) NumGPUsOfMajorityNodeType() int {
return GetGPUsMajority(g.Nodes)
}
const (
SimpleGPUAllocPolicy = "simple"
)
type Policy interface {
Allocate(nodes []*corev1.Node, gpus int) []int
}
type SimplePolicy struct{}
func NewSimplePolicy() *SimplePolicy {
return &SimplePolicy{}
}
func (s *SimplePolicy) Allocate(nodes []*corev1.Node, gpus int) []int {
// gpusMajority is the node gpus with most frequent occurrence.
// maxGPUCount is the number of nodes with gpus equal to gpusMajority
gpusMajority := GetGPUsMajority(nodes)
if gpusMajority <= 0 {
return nil
}
perNodeGPUs := Max(gpusMajority, 1)
if gpus < perNodeGPUs {
return []int{gpus}
}
var result []int
nResults := gpus / perNodeGPUs
for i := 0; i < nResults; i++ {
result = append(result, perNodeGPUs)
}
remainGPUs := gpus - nResults*perNodeGPUs
if remainGPUs > 0 {
result = append(result, remainGPUs)
}
return result
}
func Max(x, y int) int {
if x < y {
return y
}
return x
}
func MaxInArray(v []int) (int, error) {
if len(v) == 0 {
return 0, fmt.Errorf("empty list")
}
max := v[0]
for _, i := range v {
if i > max {
max = i
}
}
return max, nil
}
func GetGPUsMajority(nodes []*corev1.Node) int {
var nodeGPUCounts []int
for _, node := range nodes {
allocGPUs := node.Status.Allocatable[corev1.ResourceName("nvidia.com/gpu")]
nodeGPUCounts = append(nodeGPUCounts, int(allocGPUs.Value()))
}
// gpusMajority is the number of gpus of majority nodes.
// majorityNodes is the number of nodes with gpus equal to gpusMajority
gpusMajority, _ := ValueOccursMostFrequentInList(nodeGPUCounts)
if gpusMajority == 0 {
max, _ := MaxInArray(nodeGPUCounts)
return max
}
return gpusMajority
}
// ValueOccursMostFrequentInList returns value that occurs most frequently in list,
// and the count of occurrences.
func ValueOccursMostFrequentInList(list []int) (int, int) {
if len(list) == 0 {
return 0, 0
}
// map the occurrence frequency of each value
maxCount := 0
maxCountValue := 0
valuesMap := make(map[int]int)
for _, v := range list {
if valuesMap[v] != 0 {
valuesMap[v]++
} else {
valuesMap[v] = 1
}
if maxCount < valuesMap[v] {
maxCount = valuesMap[v]
maxCountValue = v
} else if maxCount == valuesMap[v] && maxCountValue < v {
maxCountValue = v
}
}
return maxCountValue, maxCount
}
package gpuallocator
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
)
func TestAllocators(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"Allocator Suite",
[]Reporter{printer.NewlineReporter{}})
}
var _ = Describe("Test SimpleGPUAllocator", func() {
It("ValueOccursMostFrequentInList function", func() {
testCases := map[string]struct {
list []int
expectedValue int
expectedCount int
}{
"Only one max value": {
[]int{1, 2, 3, 4, 5, 2, 2, 4, 6, 2, 3, 3, 1},
2, 4,
},
"Multi max value": {
[]int{1, 2, 3, 4, 5, 2, 2, 4, 6, 2, 3, 3, 1, 3},
3, 4,
},
"Multi max value second": {
[]int{1, 3, 3, 4, 5, 2, 2, 4, 6, 2, 3, 2, 1, 3},
3, 4,
},
}
for _, test := range testCases {
maxValue, maxCount := ValueOccursMostFrequentInList(test.list)
Expect(maxValue).To(Equal(test.expectedValue))
Expect(maxCount).To(Equal(test.expectedCount))
}
})
It("Allocate function", func() {
testCases := map[string]struct {
nodeGPUs map[int]int
gpus int
result []int
}{
"Only one max value with 12 gpus request": {
map[int]int{
8: 4,
10: 3,
6: 3,
},
12, []int{8, 4},
},
"Only one max value with 16 gpus request": {
map[int]int{
8: 4,
10: 3,
6: 3,
},
16, []int{8, 8},
},
"Multi max value with 16 gpus request": {
map[int]int{
8: 4,
10: 4,
6: 3,
},
16, []int{10, 6},
},
"Multi max value with 8 gpus request": {
map[int]int{
8: 4,
10: 4,
6: 3,
},
8, []int{8},
},
}
for _, test := range testCases {
var nodes []*corev1.Node
for nodeSpec, nodeGPUs := range test.nodeGPUs {
for i := 0; i < nodeGPUs; i++ {
nodes = append(nodes, newNode(nodeSpec))
}
}
alloc := NewSimpleGPUAllocator(nodes)
result := alloc.Allocate(test.gpus)
Expect(result).To(Equal(test.result))
}
})
})
func newNode(gpus int) *corev1.Node {
return &corev1.Node{
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
"nvidia.com/gpu": *resource.NewQuantity(int64(gpus), resource.DecimalExponent),
},
},
}
}
package controllers
// import (
// "context"
// "fmt"
// . "github.com/onsi/ginkgo"
// . "github.com/onsi/gomega"
// corev1 "k8s.io/api/core/v1"
// "k8s.io/apimachinery/pkg/api/resource"
// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/types"
// "sigs.k8s.io/controller-runtime/pkg/client"
// div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// commontypes "opendilab.org/di-orchestrator/pkg/common/types"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
// )
// var _ = Describe("DIJob Controller", func() {
// Context("When creating a DIJob", func() {
// It("Should be succeeded", func() {
// By("Create a DIJob")
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
// By("Checking the created DIJob is in Created state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobPending)
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// By("Update coordinator to Running")
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodRunning)
// Expect(err).NotTo(HaveOccurred())
// By("Checking the created DIJob has enough coordinator")
// coorStatus := make([]int, 3)
// coorStatus[0] = 1
// replicasStatuses := map[div2alpha1.ReplicaType][]int{
// div2alpha1.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// By("Checking the created DIJob is in Running state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobRunning)
// By("Update coordinator to Succeeded")
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
// Expect(err).NotTo(HaveOccurred())
// By("Checking the job is succeeded")
// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobSucceeded)
// By("Checking the coordinator is succeeded")
// coorStatus = make([]int, 3)
// coorStatus[2] = 1
// replicasStatuses = map[div2alpha1.ReplicaType][]int{
// div2alpha1.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// By("Cleaning up")
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// })
// It("DIJob status changed with components status", func() {
// type testCase struct {
// coorStatus corev1.PodPhase
// expectStatus div2alpha1.Phase
// }
// testCases := []testCase{
// {coorStatus: corev1.PodRunning, expectStatus: div2alpha1.JobRunning},
// {coorStatus: corev1.PodFailed, expectStatus: div2alpha1.JobFailed},
// {coorStatus: corev1.PodSucceeded, expectStatus: div2alpha1.JobSucceeded},
// }
// for i := range testCases {
// c := testCases[i]
// By(fmt.Sprintf("Create the %dth DIJob", i+1))
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// By("Update coordinator status")
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, c.coorStatus)
// Expect(err).NotTo(HaveOccurred())
// By("Checking the created DIJob has enough coordinator")
// coorStatus := make([]int, 3)
// switch c.coorStatus {
// case corev1.PodRunning:
// coorStatus[0] = 1
// case corev1.PodFailed:
// coorStatus[1] = 1
// case corev1.PodSucceeded:
// coorStatus[2] = 1
// }
// replicasStatuses := map[div2alpha1.ReplicaType][]int{
// div2alpha1.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// By("Checking the created DIJob's state")
// checkDIJobPhase(ctx, k8sClient, jobKey, c.expectStatus)
// By("Cleaning up")
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// }
// })
// It("Should be marked as Created when submitted", func() {
// By("Create a DIJob")
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits = make(corev1.ResourceList)
// jobTmpl.Spec.Coordinator.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] =
// resource.MustParse("1m")
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// By("Checking the created DIJob is in Created state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobPending)
// By("Cleaning up")
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// })
// })
// Context("When creating a DIJob with collectors and learners", func() {
// It("Should record collector and learner status to job status", func() {
// type replica struct {
// name string
// status corev1.PodPhase
// }
// type testCase struct {
// collectors []replica
// learners []replica
// }
// testCases := []testCase{
// {
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodSucceeded},
// {name: "job-learner-s4t", status: corev1.PodRunning},
// },
// },
// }
// for i := range testCases {
// c := testCases[i]
// By(fmt.Sprintf("Create %dth DIJob", i+1))
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name))
// colStatus := make([]int, 3)
// for _, col := range c.collectors {
// createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus)
// }
// lrStatus := make([]int, 3)
// for _, lr := range c.learners {
// createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus)
// }
// By("Checking the ReplicaStatus is as expected")
// replicasStatuses := map[div2alpha1.ReplicaType][]int{
// div2alpha1.ReplicaTypeCollector: colStatus,
// div2alpha1.ReplicaTypeLearner: lrStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// By("Checking the services are as expected")
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// return len(svcs)
// }, timeout, interval).Should(Equal(1 + len(c.collectors) + len(c.learners)))
// By("Update coordinator to Succeeded")
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
// Expect(err).NotTo(HaveOccurred())
// By("Checking the job is successfully succeeded")
// checkDIJobPhase(ctx, k8sClient, jobKey, div2alpha1.JobSucceeded)
// By("Checking the ReplicaStatus is as expected")
// coorStatus := make([]int, 3)
// coorStatus[2] = 1
// colFinishedStatus := make([]int, 3)
// lrFinishedStatus := make([]int, 3)
// colFinishedStatus[0] = 0
// colFinishedStatus[1] = colStatus[1]
// colFinishedStatus[2] = colStatus[0] + colStatus[2]
// lrFinishedStatus[0] = 0
// lrFinishedStatus[1] = lrStatus[1]
// lrFinishedStatus[2] = lrStatus[0] + lrStatus[2]
// replicasStatuses = map[div2alpha1.ReplicaType][]int{
// div2alpha1.ReplicaTypeCoordinator: coorStatus,
// div2alpha1.ReplicaTypeCollector: colFinishedStatus,
// div2alpha1.ReplicaTypeLearner: lrFinishedStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// }
// })
// It("Should build right gpu ports and master port when the pod is ddp learner", func() {
// type replica struct {
// name string
// ddpLearnerType string
// gpus int
// expectedPorts int
// }
// testCases := []replica{
// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 4, expectedPorts: 5},
// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeWorker, gpus: 6, expectedPorts: 6},
// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 1, expectedPorts: 2},
// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeMaster, gpus: 0, expectedPorts: 2},
// {name: "job-ddp-learner-sdf", ddpLearnerType: dicommon.DDPLearnerTypeWorker, gpus: 0, expectedPorts: 1},
// }
// for i := range testCases {
// c := testCases[i]
// By(fmt.Sprintf("Create %dth DIJob", i+1))
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// dijob, _ := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name))
// pod := buildPod(c.name, dijob.Name, dicommon.DDPLearnerName, ownRefer)
// pod.Labels[dicommon.DDPLearnerTypeLabel] = c.ddpLearnerType
// resources := commontypes.ResourceQuantity{GPU: resource.MustParse(fmt.Sprint(c.gpus))}
// diutil.SetPodResources(pod, resources)
// err = k8sClient.Create(ctx, pod, &client.CreateOptions{})
// Expect(err).NotTo(HaveOccurred())
// By("Checking the # of service's ports are as expected")
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// _, _, _, _, DDPLearners, err := diutil.ClassifyServices(svcs)
// Expect(err).NotTo(HaveOccurred())
// if len(DDPLearners) == 0 {
// return -1
// }
// return len(DDPLearners[0].Spec.Ports)
// }, timeout, interval).Should(Equal(c.expectedPorts))
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// }
// })
// })
// })
// func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div2alpha1.DIJob) (
// div2alpha1.DIJob, types.NamespacedName) {
// name := diutil.GenerateName(dijob.Name)
// dijob.SetName(name)
// err := k8sClient.Create(ctx, dijob, &client.CreateOptions{})
// Expect(err).ShouldNot(HaveOccurred())
// By(fmt.Sprintf("Checking the DIJob %s is successfully created", name))
// key := types.NamespacedName{Namespace: dijob.Namespace, Name: dijob.Name}
// createdDIjob := div2alpha1.DIJob{}
// Eventually(func() bool {
// err := k8sClient.Get(ctx, key, &createdDIjob)
// return err == nil
// }, timeout, interval).Should(BeTrue())
// return createdDIjob, key
// }
// func checkCoordinatorCreated(ctx context.Context, dijob div2alpha1.DIJob) {
// By("Checking coordinator are created")
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// var pod corev1.Pod
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// Eventually(func() bool {
// err := k8sClient.Get(ctx, podKey, &pod)
// return err == nil
// }, timeout, interval).Should(BeTrue())
// }
// func createAndUpdatePodPhase(
// ctx context.Context, k8sClient client.Client,
// name, jobName string, status corev1.PodPhase, replicaType string,
// ownRefer metav1.OwnerReference, statuses []int) {
// pod := buildPod(name, jobName, replicaType, ownRefer)
// createPodAndUpdatePhase(ctx, k8sClient, pod, status, statuses)
// }
// func buildPod(name, jobName string, replicaType string,
// ownRefer metav1.OwnerReference) *corev1.Pod {
// pod := testutil.NewPod(name, jobName, ownRefer)
// labs := diutil.GenLabels(jobName)
// labs[dicommon.ReplicaTypeLabel] = replicaType
// labs[dicommon.PodNameLabel] = pod.Name
// pod.SetLabels(labs)
// return pod
// }
// func createPodAndUpdatePhase(ctx context.Context, k8sClient client.Client,
// pod *corev1.Pod, status corev1.PodPhase, statuses []int) {
// err := k8sClient.Create(ctx, pod, &client.CreateOptions{})
// Expect(err).NotTo(HaveOccurred())
// podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
// testutil.UpdatePodPhase(ctx, k8sClient, podKey, status)
// switch status {
// case corev1.PodRunning:
// statuses[0]++
// case corev1.PodFailed:
// statuses[1]++
// case corev1.PodSucceeded:
// statuses[2]++
// }
// }
// func checkDIJobPhase(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, phase div2alpha1.Phase) {
// var dijob div2alpha1.DIJob
// Eventually(func() div2alpha1.Phase {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return div2alpha1.JobUnknown
// }
// return dijob.Status.Phase
// }, timeout, interval).Should(Equal(phase))
// }
// func checkReplicasStatuses(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, replicasStatuses map[div2alpha1.ReplicaType][]int) {
// for rtype, status := range replicasStatuses {
// var dijob div2alpha1.DIJob
// Eventually(func() []int {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return nil
// }
// if dijob.Status.ReplicaStatus == nil {
// return nil
// }
// result := make([]int, 3)
// result[0] = int(dijob.Status.ReplicaStatus[rtype].Active)
// result[1] = int(dijob.Status.ReplicaStatus[rtype].Failed)
// result[2] = int(dijob.Status.ReplicaStatus[rtype].Succeeded)
// return result
// }, timeout, interval).Should(Equal(status))
// }
// }
import (
"context"
"fmt"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
dicommon "opendilab.org/di-orchestrator/pkg/common"
dicontext "opendilab.org/di-orchestrator/pkg/context"
diutil "opendilab.org/di-orchestrator/pkg/utils"
testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
)
var _ = Describe("DIJob Controller", func() {
Context("When creating a DIJob", func() {
It("Should be succeeded", func() {
By("Create a DIJob")
var err error
jobTmpl := testutil.NewDIJob()
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
By("Check the created DIJob is in Starting state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting)
By("Update workers to Running")
for rank := 0; rank < int(job.Status.Replicas); rank++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodRunning)
Expect(err).NotTo(HaveOccurred())
}
By("Checking the created DIJob has enough ready replicas")
readyReplicas := int(job.Status.Replicas)
checkReadyReplicas(ctx, jobKey, readyReplicas)
By("Checking the created DIJob is in Running state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobRunning)
By("Update workers to Succeeded")
for rank := 0; rank < int(job.Status.Replicas); rank++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodSucceeded)
Expect(err).NotTo(HaveOccurred())
}
By("Checking the job is succeeded")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobSucceeded)
By("Checking there is not ready replicas")
readyReplicas = 0
checkReadyReplicas(ctx, jobKey, readyReplicas)
By("Cleaning up")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
})
It("DIJob status changed with worker status", func() {
type testCase struct {
workerStatus corev1.PodPhase
expectStatus div2alpha1.Phase
}
testCases := []testCase{
{workerStatus: corev1.PodRunning, expectStatus: div2alpha1.JobRunning},
{workerStatus: corev1.PodFailed, expectStatus: div2alpha1.JobFailed},
{workerStatus: corev1.PodSucceeded, expectStatus: div2alpha1.JobSucceeded},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create the %dth DIJob", i+1))
var err error
jobTmpl := testutil.NewDIJob()
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
By("Update workers status")
for rank := 0; rank < int(job.Status.Replicas); rank++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, podKey, c.workerStatus)
Expect(err).NotTo(HaveOccurred())
}
By("Checking the created DIJob's state")
checkDIJobPhase(ctx, jobKey, c.expectStatus)
By("Cleaning up")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
}
})
It("Should be marked as Pending when submitted", func() {
By("Create a DIJob")
var err error
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.Template.Spec.Containers[0].Resources.Limits = make(corev1.ResourceList)
jobTmpl.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] =
resource.MustParse("1m")
job, jobKey := createDIJob(ctx, jobTmpl)
By("Checking the created DIJob is in Pending state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobPending)
By("Cleaning up")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
})
})
Context("When DIJob is starting", func() {
It("Should be Restarting when condition meat", func() {
type testCase struct {
replicas int
scheduled int
ready int
missed bool
rescheduled bool
expectPhase div2alpha1.Phase
expectRestart int32
}
testCases := []testCase{
{replicas: 4, missed: false, scheduled: 4, ready: 4, rescheduled: false, expectPhase: div2alpha1.JobRunning, expectRestart: 0},
{replicas: 4, missed: true, scheduled: 3, ready: 3, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1},
{replicas: 4, missed: false, scheduled: 3, ready: 3, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 0},
{replicas: 4, missed: false, scheduled: 3, ready: 3, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 1},
{replicas: 4, missed: true, scheduled: 0, ready: 0, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1},
{replicas: 4, missed: true, scheduled: 0, ready: 0, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 2},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create the %dth DIJob", i+1))
var err error
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.MinReplicas = int32(c.replicas)
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
if c.missed {
By("Delete missed replicas")
rank := c.replicas - 1
pod, err := getReplicaPod(ctx, jobKey, &job, rank, false)
Expect(err).NotTo(HaveOccurred())
err = ctx.Delete(context.TODO(), pod, &client.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
By("Wait until generation changed")
Eventually(func() int {
ctx.Get(context.TODO(), jobKey, &job)
return int(job.Status.Generation)
}, timeout, interval).Should(Equal(1))
By("Wait until operator recreate all replicas")
Eventually(func() int {
pods, _ := ctx.ListJobPods(&job)
return len(pods)
}, timeout, interval).Should(Equal(c.replicas))
}
recreated := true
By("Update scheduled replicas")
for j := 0; j < c.scheduled; j++ {
pod, err := getReplicaPod(ctx, jobKey, &job, j, recreated)
Expect(err).NotTo(HaveOccurred())
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
})
err = ctx.Status().Update(context.TODO(), pod, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
}
By("Update ready replicas")
for j := 0; j < c.ready; j++ {
pod, err := getReplicaPod(ctx, jobKey, &job, j, recreated)
Expect(err).NotTo(HaveOccurred())
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
Ready: true,
})
err = ctx.Status().Update(context.TODO(), pod, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
}
if c.rescheduled {
By("Mark pod rescheduled")
pod, err := getReplicaPod(ctx, jobKey, &job, 0, recreated)
Expect(err).NotTo(HaveOccurred())
pod.Annotations[dicommon.AnnotationReplicas] = fmt.Sprint(c.replicas + 1)
err = ctx.Update(context.TODO(), pod, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Wait until generation changed")
Eventually(func() int32 {
ctx.Get(context.TODO(), jobKey, &job)
return job.Status.Generation
}, timeout, interval).Should(Equal(c.expectRestart))
By("Wait until operator recreate all replicas")
Eventually(func() int {
pods, _ := ctx.ListJobPods(&job)
return len(pods)
}, timeout, interval).Should(Equal(c.replicas))
}
By("Check the created DIJob is in expected phase")
checkDIJobPhase(ctx, jobKey, c.expectPhase)
By("Check the restart count is as expected")
err = ctx.Get(context.TODO(), jobKey, &job)
Expect(err).NotTo(HaveOccurred())
Expect(job.Status.Generation).Should(Equal(c.expectRestart))
By("Cleaning up")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
}
})
})
Context("When DIJob is Running", func() {
It("Should be Restarting when condition meat", func() {
type testCase struct {
replicas int
missed bool
rescheduled bool
expectPhase div2alpha1.Phase
expectRestart int32
}
testCases := []testCase{
{replicas: 4, missed: true, rescheduled: false, expectPhase: div2alpha1.JobStarting, expectRestart: 1},
{replicas: 4, missed: true, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 2},
{replicas: 3, missed: false, rescheduled: false, expectPhase: div2alpha1.JobRunning, expectRestart: 0},
{replicas: 4, missed: false, rescheduled: true, expectPhase: div2alpha1.JobStarting, expectRestart: 1},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create the %dth DIJob", i+1))
var err error
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.MinReplicas = int32(c.replicas)
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
By("Update workers to Running")
for rank := 0; rank < int(job.Status.Replicas); rank++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, podKey, corev1.PodRunning)
Expect(err).NotTo(HaveOccurred())
}
By("Checking the created DIJob is in Running state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobRunning)
if c.missed {
By("Delete missed replicas")
rank := c.replicas - 1
pod, err := getReplicaPod(ctx, jobKey, &job, rank, false)
Expect(err).NotTo(HaveOccurred())
err = ctx.Delete(context.TODO(), pod, &client.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
By("Wait until generation changed")
Eventually(func() int {
ctx.Get(context.TODO(), jobKey, &job)
return int(job.Status.Generation)
}, timeout, interval).Should(Equal(1))
By("Wait until operator recreate all replicas")
Eventually(func() int {
pods, _ := ctx.ListJobPods(&job)
return len(pods)
}, timeout, interval).Should(Equal(c.replicas))
}
recreated := true
if c.rescheduled {
By("Mark pod rescheduled")
pod, err := getReplicaPod(ctx, jobKey, &job, 0, recreated)
Expect(err).NotTo(HaveOccurred())
pod.Annotations[dicommon.AnnotationReplicas] = fmt.Sprint(c.replicas + 1)
err = ctx.Update(context.TODO(), pod, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Wait until generation changed")
Eventually(func() int32 {
ctx.Get(context.TODO(), jobKey, &job)
return job.Status.Generation
}, timeout, interval).Should(Equal(c.expectRestart))
By("Wait until operator recreate all replicas")
Eventually(func() int {
pods, _ := ctx.ListJobPods(&job)
return len(pods)
}, timeout, interval).Should(Equal(c.replicas))
}
By("Check the created DIJob is in expected phase")
checkDIJobPhase(ctx, jobKey, c.expectPhase)
By("Check the restart count is as expected")
err = ctx.Get(context.TODO(), jobKey, &job)
Expect(err).NotTo(HaveOccurred())
Expect(job.Status.Generation).Should(Equal(c.expectRestart))
By("Cleaning up")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
}
})
})
})
func createAndUpdateReplicas(ctx dicontext.Context, jobTmpl *div2alpha1.DIJob) (
div2alpha1.DIJob, types.NamespacedName) {
var err error
job, jobKey := createDIJob(ctx, jobTmpl)
By("Sleep for a few time to wait for condition synced")
time.Sleep(100 * time.Millisecond)
err = ctx.Get(context.TODO(), jobKey, &job)
Expect(err).NotTo(HaveOccurred())
By("Update status.replicas")
job.Status.Replicas = job.Spec.MinReplicas
err = ctx.Status().Update(context.TODO(), &job, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Check replicas created")
checkWorkersCreated(ctx, job)
return job, jobKey
}
func createDIJob(ctx dicontext.Context, job *div2alpha1.DIJob) (
div2alpha1.DIJob, types.NamespacedName) {
name := diutil.GenerateName(job.Name)
job.SetName(name)
key := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
createdDIjob := div2alpha1.DIJob{}
err := ctx.Create(context.TODO(), job, &client.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Checking the DIJob %s is successfully created", name))
Eventually(func() bool {
err := ctx.Get(context.TODO(), key, &createdDIjob)
return err == nil
}, timeout, interval).Should(BeTrue())
return createdDIjob, key
}
func checkWorkersCreated(ctx dicontext.Context, job div2alpha1.DIJob) {
Eventually(func() bool {
for i := 0; i < int(job.Spec.MinReplicas); i++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), i)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
var pod corev1.Pod
if err := ctx.Get(context.TODO(), podKey, &pod); err != nil {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
}
func getReplicaPod(ctx dicontext.Context, jobKey types.NamespacedName, job *div2alpha1.DIJob, rank int, recreated bool) (*corev1.Pod, error) {
time.Sleep(10 * time.Millisecond)
var err error
err = ctx.Get(context.TODO(), jobKey, job)
if err != nil {
return nil, err
}
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
if !recreated {
replicaName = diutil.ReplicaName(job.Name, 0, rank)
}
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
var pod corev1.Pod
err = ctx.Get(context.TODO(), podKey, &pod)
if err != nil {
return nil, err
}
return &pod, nil
}
func checkDIJobPhase(ctx dicontext.Context, jobKey types.NamespacedName, phase div2alpha1.Phase) {
var job div2alpha1.DIJob
Eventually(func() div2alpha1.Phase {
err := ctx.Get(context.TODO(), jobKey, &job)
if err != nil {
return div2alpha1.JobUnknown
}
return job.Status.Phase
}, timeout, interval).Should(Equal(phase))
}
func checkReadyReplicas(ctx dicontext.Context, jobKey types.NamespacedName, readyReplicas int) {
var job div2alpha1.DIJob
Eventually(func() int {
err := ctx.Get(context.TODO(), jobKey, &job)
if err != nil {
return -1
}
return int(job.Status.ReadyReplicas)
}, timeout, interval).Should(Equal(readyReplicas))
}
package controllers
// import (
// "context"
// "fmt"
import (
"fmt"
"strings"
// . "github.com/onsi/ginkgo"
// . "github.com/onsi/gomega"
// corev1 "k8s.io/api/core/v1"
// "k8s.io/apimachinery/pkg/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
// div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
// )
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
dicommon "opendilab.org/di-orchestrator/pkg/common"
diutil "opendilab.org/di-orchestrator/pkg/utils"
testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
)
// var _ = Describe("DIJob Specification", func() {
var _ = Describe("DIJob Specification", func() {
// Context("When creating a DIJob with different CleanPodPolicy", func() {
// It("Should execute different pods deletion policy with different CleanPodPolicy", func() {
// cleanPodPolicies := []div2alpha1.CleanPodPolicy{
// div2alpha1.CleanPodPolicyAll,
// div2alpha1.CleanPodPolicyRunning,
// div2alpha1.CleanPodPolicyNone,
// }
// for _, policy := range cleanPodPolicies {
// type replica struct {
// name string
// status corev1.PodPhase
// }
// type testCase struct {
// runnings int
// collectors []replica
// learners []replica
// }
// testCases := []testCase{
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodSucceeded},
// {name: "job-learner-s4t", status: corev1.PodRunning},
// },
// },
// }
// for i := range testCases {
// c := testCases[i]
// By(fmt.Sprintf("Create %dth DIJob", i+1))
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// jobTmpl.Spec.CleanPodPolicy = policy
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
Context("Test DIJob Specification", func() {
It("Should execute different pods deletion policy with different CleanPodPolicy", func() {
cleanPodPolicies := []div2alpha1.CleanPodPolicy{
div2alpha1.CleanPodPolicyAll,
div2alpha1.CleanPodPolicyRunning,
div2alpha1.CleanPodPolicyNone,
}
for _, policy := range cleanPodPolicies {
type testCase struct {
runnings int // pending pods are also considered as running pods
replicaStatues []corev1.PodPhase
}
testCases := []testCase{
{
runnings: 2,
replicaStatues: []corev1.PodPhase{
corev1.PodRunning, corev1.PodRunning, corev1.PodFailed, corev1.PodSucceeded,
},
},
{
runnings: 0,
replicaStatues: []corev1.PodPhase{
corev1.PodFailed, corev1.PodSucceeded, corev1.PodFailed,
},
},
{
runnings: 3,
replicaStatues: []corev1.PodPhase{
corev1.PodPending, corev1.PodRunning, corev1.PodFailed, corev1.PodRunning,
},
},
{
runnings: 0,
replicaStatues: []corev1.PodPhase{
corev1.PodFailed, corev1.PodFailed, corev1.PodFailed,
},
},
{
runnings: 0,
replicaStatues: []corev1.PodPhase{
corev1.PodSucceeded, corev1.PodSucceeded, corev1.PodSucceeded,
},
},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create %dth DIJob", i+1))
var err error
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.MinReplicas = int32(len(c.replicaStatues))
jobTmpl.Spec.CleanPodPolicy = policy
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("ownRefer: %s %s", ownRefer.APIVersion, ownRefer.Kind))
// colStatus := make([]int, 3)
// for _, col := range c.collectors {
// By(fmt.Sprintf("Create pod %s", col.name))
// createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus)
// }
By("Check the created DIJob is in Starting state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting)
// lrStatus := make([]int, 3)
// for _, lr := range c.learners {
// By(fmt.Sprintf("Create pod %s", lr.name))
// createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus)
// }
By("Update workers status")
for rank := 0; rank < len(c.replicaStatues); rank++ {
replicaName := diutil.ReplicaName(job.Name, int(job.Status.Generation), rank)
podKey := types.NamespacedName{Namespace: job.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, podKey, c.replicaStatues[rank])
Expect(err).NotTo(HaveOccurred())
}
// By("Get the number of pods")
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// Expect(err).NotTo(HaveOccurred())
// npods := len(pods)
By("Get the number of pods")
pods, err := ctx.ListJobPods(&job)
Expect(err).NotTo(HaveOccurred())
npods := len(pods)
// By("Update coordinator to Succeeded")
// for _, replicaName := range []string{
// diutil.ReplicaPodName(dijob.Name, "coordinator"),
// } {
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
// Expect(err).NotTo(HaveOccurred())
// }
By("Checking all the pods and services are deleted")
// By("Checking the job is succeeded")
// Eventually(func() div2alpha1.Phase {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return div2alpha1.JobUnknown
// }
// return dijob.Status.Phase
// }, timeout, interval).Should(Equal(div2alpha1.JobSucceeded))
switch policy {
case div2alpha1.CleanPodPolicyAll:
Eventually(func() int {
pods, err := ctx.ListJobPods(&job)
if err != nil {
return -1
}
return len(pods)
}, timeout, interval).Should(Equal(0))
Eventually(func() int {
svcs, err := ctx.ListJobServices(&job)
if err != nil {
return -1
}
return len(svcs)
}, timeout, interval).Should(Equal(0))
case div2alpha1.CleanPodPolicyNone:
Consistently(func() int {
pods, err := ctx.ListJobPods(&job)
if err != nil {
return -1
}
return len(pods)
}, duration, interval).Should(Equal(npods))
Eventually(func() int {
svcs, err := ctx.ListJobServices(&job)
if err != nil {
return -1
}
return len(svcs)
}, timeout, interval).Should(Equal(0))
case div2alpha1.CleanPodPolicyRunning:
Eventually(func() int {
pods, err := ctx.ListJobPods(&job)
if err != nil {
return -1
}
return len(pods)
}, timeout, interval).Should(Equal(npods - c.runnings))
Eventually(func() int {
svcs, err := ctx.ListJobServices(&job)
if err != nil {
return -1
}
return len(svcs)
}, timeout, interval).Should(Equal(0))
}
// By("Checking all the pods and services are deleted")
By("Clean up pods")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
}
}
})
It("Should create replicas with different connections relying on topology and parallel workers", func() {
type testCase struct {
topology div2alpha1.Topology
replicas int
paralleWorkers int
expectAttachedNodes int
}
testCases := []testCase{
{
topology: div2alpha1.TopologyAlone, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0,
},
{
topology: div2alpha1.TopologyAlone, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 0,
},
{
topology: div2alpha1.TopologyStar, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0,
},
{
topology: div2alpha1.TopologyStar, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 1,
},
{
topology: div2alpha1.TopologyStar, replicas: 3, paralleWorkers: 3, expectAttachedNodes: 2,
},
{
topology: div2alpha1.TopologyStar, replicas: 3, paralleWorkers: 4, expectAttachedNodes: 2,
},
{
topology: div2alpha1.TopologyMesh, replicas: 1, paralleWorkers: 1, expectAttachedNodes: 0,
},
{
topology: div2alpha1.TopologyMesh, replicas: 2, paralleWorkers: 3, expectAttachedNodes: 3,
},
{
topology: div2alpha1.TopologyMesh, replicas: 3, paralleWorkers: 3, expectAttachedNodes: 9,
},
{
topology: div2alpha1.TopologyMesh, replicas: 3, paralleWorkers: 4, expectAttachedNodes: 12,
},
}
for i := range testCases {
c := testCases[i]
By(fmt.Sprintf("Create %dth DIJob", i+1))
var err error
jobTmpl := testutil.NewDIJob()
jobTmpl.Spec.MinReplicas = int32(c.replicas)
jobTmpl.Spec.EngineFields.ParallelWorkers = int32(c.paralleWorkers)
jobTmpl.Spec.EngineFields.Topology = c.topology
job, jobKey := createAndUpdateReplicas(ctx, jobTmpl)
// switch policy {
// case div2alpha1.CleanPodPolicyAll:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, timeout, interval).Should(Equal(0))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, timeout, interval).Should(Equal(0))
// case div2alpha1.CleanPodPolicyNone:
// Consistently(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, duration, interval).Should(Equal(npods))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, duration, interval).Should(Equal(0))
// case div2alpha1.CleanPodPolicyRunning:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, duration, interval).Should(Equal(npods - c.runnings))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, duration, interval).Should(Equal(0))
// }
By("Check the created DIJob is in Starting state")
checkDIJobPhase(ctx, jobKey, div2alpha1.JobStarting)
// By("Clean up pods")
// err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy())
// Expect(err).NotTo(HaveOccurred())
// }
// }
// })
// })
// })
By("Check workers' attached nodes are as expected")
Eventually(func() int {
pods, err := ctx.ListJobPods(&job)
if err != nil {
return -1
}
attachedNodes := 0
for _, pod := range pods {
for _, env := range pod.Spec.Containers[0].Env {
if env.Name == dicommon.ENVAttachedNodesArg {
if env.Value == "" {
continue
}
attachedNodes += len(strings.Split(env.Value, ","))
}
}
}
return attachedNodes
}, timeout, interval).Should(Equal(c.expectAttachedNodes))
By("Clean up pods")
err = ctx.CleanUpJob(&job)
Expect(err).NotTo(HaveOccurred())
}
})
})
})
......@@ -21,13 +21,13 @@ import (
"fmt"
"path/filepath"
"testing"
"time"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/config"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf "sigs.k8s.io/controller-runtime/pkg/log"
......@@ -42,20 +42,20 @@ import (
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
const (
// timeout = 5 * time.Second
// interval = 250 * time.Millisecond
// duration = 200 * time.Millisecond
timeout = 5 * time.Second
interval = 250 * time.Millisecond
duration = 200 * time.Millisecond
)
// var cfg *rest.Config
var k8sClient client.Client
var ctx dicontext.Context
var testEnv *envtest.Environment
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"Controller Suite",
"DI-Controller Suite",
[]Reporter{printer.NewlineReporter{}})
}
......@@ -77,10 +77,6 @@ var _ = BeforeSuite(func() {
//+kubebuilder:scaffold:scheme
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
// create controller manager
metricPort := config.GinkgoConfig.ParallelNode + 8200
metricAddress := fmt.Sprintf(":%d", metricPort)
......@@ -90,11 +86,11 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())
ctx := dicontext.NewContext(context.Background(),
ctx = dicontext.NewContext(context.Background(),
cfg,
k8sManager.GetClient(),
k8sManager.GetEventRecorderFor("di-operator"),
ctrl.Log.WithName("di-operator"))
k8sManager.GetEventRecorderFor("controller"),
ctrl.Log.WithName("controller"))
reconciler := NewDIJobReconciler(k8sManager.GetScheme(), ctx)
if err = reconciler.SetupWithManager(k8sManager); err != nil {
Expect(err).NotTo(HaveOccurred())
......
......@@ -68,7 +68,7 @@ func TestServer(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"Server Suite",
"DI-Server Suite",
[]Reporter{printer.NewlineReporter{}})
}
......
......@@ -19,6 +19,9 @@ func NewDIJob() *div2alpha1.DIJob {
Namespace: DIJobNamespace,
},
Spec: div2alpha1.DIJobSpec{
MinReplicas: 1,
MaxReplicas: 4,
Preemptible: false,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
......
......@@ -15,9 +15,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
dicommon "opendilab.org/di-orchestrator/pkg/common"
dicontext "opendilab.org/di-orchestrator/pkg/context"
)
func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod {
func NewPod(name, namespace string, ownRefer metav1.OwnerReference) *corev1.Pod {
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
......@@ -25,7 +26,7 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod {
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: DIJobNamespace,
Namespace: namespace,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
......@@ -41,9 +42,9 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod {
return pod
}
func UpdatePodPhase(ctx context.Context, k8sClient client.Client, podKey types.NamespacedName, phase corev1.PodPhase) error {
func UpdatePodPhase(ctx dicontext.Context, podKey types.NamespacedName, phase corev1.PodPhase) error {
var pod corev1.Pod
err := k8sClient.Get(ctx, podKey, &pod)
err := ctx.Get(context.TODO(), podKey, &pod)
if err != nil {
return err
}
......@@ -54,11 +55,11 @@ func UpdatePodPhase(ctx context.Context, k8sClient client.Client, podKey types.N
state := corev1.ContainerStateRunning{}
cstatus := corev1.ContainerStatus{Name: containerName, State: corev1.ContainerState{
Running: &state,
}}
}, Ready: true}
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, cstatus)
}
err = k8sClient.Status().Update(ctx, &pod, &client.UpdateOptions{})
err = ctx.Status().Update(context.TODO(), &pod, &client.UpdateOptions{})
if err != nil {
return err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册