diff --git a/cmd/operator/operator.go b/cmd/operator/operator.go index 7ce60da8d1ce128a74893738db3610056b376499..985f72c94b64b4f754ab819593c918cd174b58d9 100644 --- a/cmd/operator/operator.go +++ b/cmd/operator/operator.go @@ -101,8 +101,8 @@ func runCommand(cmd *cobra.Command, options *CreateOptions) error { return err } - ctx := handler.NewContext(config, - context.Background(), + ctx := handler.NewContext(context.Background(), + config, mgr.GetClient(), mgr.GetEventRecorderFor("di-operator"), ctrl.Log.WithName("di-operator")) diff --git a/e2e/e2e_suite_test.go b/e2e/e2e_suite_test.go index bd9e6a39a688c9b7374c28e2d2065059cc592ab8..583f6d0d5e1a671b5bc5c0fde1e78c1718778acf 100644 --- a/e2e/e2e_suite_test.go +++ b/e2e/e2e_suite_test.go @@ -1,86 +1,86 @@ package e2e -import ( - "context" - "flag" - "os" - "path/filepath" - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/clientcmd" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest/printer" - - div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2" -) - -func TestE2E(t *testing.T) { - RegisterFailHandler(Fail) - - RunSpecsWithDefaultAndCustomReporters(t, - "E2E Suite", - []Reporter{printer.NewlineReporter{}}) -} - -var ( - k8sClient client.Client - clientset *kubernetes.Clientset - - kubeconfig string - exampleJobsDir string - sharedVolumesDir string -) - -func init() { - testing.Init() - - if flag.Lookup("kubeconfig") == nil { - flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file path") - } - flag.StringVar(&sharedVolumesDir, "shared-volumes-dir", "/data/nfs/ding/", "dir to shared volumes") - flag.StringVar(&exampleJobsDir, "example-jobs-dir", "./config", "dir to the example jobs") - flag.Parse() - - kubeconfig = flag.Lookup("kubeconfig").Value.String() - - if kubeconfig == "" { - kubeconfig = os.Getenv("KUBECONFIG") - if kubeconfig == "" { - kubeconfig = filepath.Join(homeDir(), ".kube", "config") - } - } -} - -func homeDir() string { - if h := os.Getenv("HOME"); h != "" { - return h - } - return os.Getenv("USERPROFILE") // windows -} - -var _ = BeforeSuite(func() { - // uses the current context in kubeconfig - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - Expect(err).NotTo(HaveOccurred()) - err = div1alpha2.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - //+kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) - - clientset, err = kubernetes.NewForConfig(cfg) - Expect(err).NotTo(HaveOccurred()) - - k8sClient.DeleteAllOf(context.Background(), &div1alpha2.DIJob{}, - client.InNamespace(namespace), client.MatchingLabels{"stability-test": "dijobs"}) -}) - -var _ = AfterSuite(func() { -}) +// import ( +// "context" +// "flag" +// "os" +// "path/filepath" +// "testing" + +// . "github.com/onsi/ginkgo" +// . "github.com/onsi/gomega" +// "k8s.io/client-go/kubernetes" +// "k8s.io/client-go/kubernetes/scheme" +// "k8s.io/client-go/tools/clientcmd" +// "sigs.k8s.io/controller-runtime/pkg/client" +// "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + +// div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2" +// ) + +// func TestE2E(t *testing.T) { +// RegisterFailHandler(Fail) + +// RunSpecsWithDefaultAndCustomReporters(t, +// "E2E Suite", +// []Reporter{printer.NewlineReporter{}}) +// } + +// var ( +// k8sClient client.Client +// clientset *kubernetes.Clientset + +// kubeconfig string +// exampleJobsDir string +// sharedVolumesDir string +// ) + +// func init() { +// testing.Init() + +// if flag.Lookup("kubeconfig") == nil { +// flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file path") +// } +// flag.StringVar(&sharedVolumesDir, "shared-volumes-dir", "/data/nfs/ding/", "dir to shared volumes") +// flag.StringVar(&exampleJobsDir, "example-jobs-dir", "./config", "dir to the example jobs") +// flag.Parse() + +// kubeconfig = flag.Lookup("kubeconfig").Value.String() + +// if kubeconfig == "" { +// kubeconfig = os.Getenv("KUBECONFIG") +// if kubeconfig == "" { +// kubeconfig = filepath.Join(homeDir(), ".kube", "config") +// } +// } +// } + +// func homeDir() string { +// if h := os.Getenv("HOME"); h != "" { +// return h +// } +// return os.Getenv("USERPROFILE") // windows +// } + +// var _ = BeforeSuite(func() { +// // uses the current context in kubeconfig +// cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) +// Expect(err).NotTo(HaveOccurred()) +// err = div1alpha2.AddToScheme(scheme.Scheme) +// Expect(err).NotTo(HaveOccurred()) + +// //+kubebuilder:scaffold:scheme + +// k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) +// Expect(err).NotTo(HaveOccurred()) +// Expect(k8sClient).NotTo(BeNil()) + +// clientset, err = kubernetes.NewForConfig(cfg) +// Expect(err).NotTo(HaveOccurred()) + +// k8sClient.DeleteAllOf(context.Background(), &div1alpha2.DIJob{}, +// client.InNamespace(namespace), client.MatchingLabels{"stability-test": "dijobs"}) +// }) + +// var _ = AfterSuite(func() { +// }) diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 6c2441fd66ae8b840912dd52acc8819076d682c9..0d2505955aa84851172748be3f76cd4a85d5243d 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -1,297 +1,297 @@ package e2e -import ( - "context" - "fmt" - "io/ioutil" - "os/exec" - "path/filepath" - "strings" - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/yaml" - "sigs.k8s.io/controller-runtime/pkg/client" - - div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2" - dicommon "opendilab.org/di-orchestrator/pkg/common" - diutil "opendilab.org/di-orchestrator/pkg/utils" - testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" -) - -const ( - namespace = "test" - messageRepresentJobStartsTraining = "Sample data 0 Times" - defaultSharedVolumeName = "work-dir" - - logTimeout = 5 * time.Minute - networkFailedDuration = 10 * time.Second - timeout = 20 * time.Minute - interval = 3 * time.Second -) - -var _ = Describe("E2E test for DI-engine", func() { - Context("When DIJob meets network exception", func() { - It("Should reconnect after each module's service is recreated", func() { - testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} - for i := range testCases { - tc := testCases[i] - By(fmt.Sprintf("Create %dth DIJob", i+1)) - - jobPath := filepath.Join(exampleJobsDir, "dijob.yaml") - if tc == "aggregator" || tc == "ddp-learner" { - jobPath = filepath.Join(exampleJobsDir, "dijob-multi-gpu.yaml") - } - - sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) - job := buildDIJob(jobPath, sharedVolumePath) - - ctx := context.Background() - var err error - - err = k8sClient.Create(ctx, job, &client.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Waiting for replicas to be running")) - err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) - Expect(err).NotTo(HaveOccurred()) - - By("Checking coordinator's log to decide to delete services") - replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") - err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Delete %s service for a while", tc)) - svcs, err := diutil.ListServices(ctx, k8sClient, job) - Expect(err).NotTo(HaveOccurred()) - - var svc *corev1.Service - for _, isvc := range svcs { - if strings.Contains(isvc.Name, tc) { - svc = isvc - } - } - - By(fmt.Sprintf("Delete %s service", svc.Name)) - err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By("Waiting for job to be succeeded") - jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} - Eventually(func() div1alpha2.Phase { - err := k8sClient.Get(ctx, jobKey, job) - if err != nil { - return div1alpha2.JobUnknown - } - return job.Status.Phase - }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) - - testutil.CleanUpJob(ctx, k8sClient, job) - } - }) - - It("Should reconnect after all replicas are available again", func() { - By("Create DIJob") - jobPath := filepath.Join(exampleJobsDir, "dijob-multi-gpu.yaml") - sharedVolumePath := filepath.Join(sharedVolumesDir, "cartpole-multi-gpu") - job := buildDIJob(jobPath, sharedVolumePath) - - ctx := context.Background() - var err error - - err = k8sClient.Create(ctx, job, &client.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Waiting for replicas to be running")) - err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) - Expect(err).NotTo(HaveOccurred()) - - By("Checking coordinator's log to decide to delete services") - replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") - err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) - Expect(err).NotTo(HaveOccurred()) - - By("Delete all modules' service for a while") - svcs, err := diutil.ListServices(ctx, k8sClient, job) - Expect(err).NotTo(HaveOccurred()) - - for _, svc := range svcs { - svc.ResourceVersion = "" - svc.Status = corev1.ServiceStatus{} - - By(fmt.Sprintf("Delete %s service", svc.Name)) - err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Sleep for %s before delete next module", networkFailedDuration.String())) - time.Sleep(networkFailedDuration) - } - - By("Waiting for job to be succeeded") - jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} - Eventually(func() div1alpha2.Phase { - err := k8sClient.Get(ctx, jobKey, job) - if err != nil { - return div1alpha2.JobUnknown - } - return job.Status.Phase - }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) - - testutil.CleanUpJob(ctx, k8sClient, job) - }) - }) - Context("When some of DIJob's replicas are deleted", func() { - It("DIJob should catch the change and request di-server to create new replicas", func() { - testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} - for i := range testCases { - tc := testCases[i] - By(fmt.Sprintf("Create %dth DIJob", i+1)) - - jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml") - if tc == "aggregator" || tc == "ddp-learner" { - jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml") - } - - sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) - job := buildDIJob(jobPath, sharedVolumePath) - - ctx := context.Background() - var err error - - err = k8sClient.Create(ctx, job, &client.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Waiting for replicas to be running")) - err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) - Expect(err).NotTo(HaveOccurred()) - - By("Checking coordinator's log to decide to delete replicas") - replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") - err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Delete %s pod for a while", tc)) - pods, err := diutil.ListPods(ctx, k8sClient, job) - Expect(err).NotTo(HaveOccurred()) - - var pod *corev1.Pod - for _, ipod := range pods { - if strings.Contains(ipod.Name, tc) { - pod = ipod - } - } - - By(fmt.Sprintf("Delete %s replica", pod.Name)) - err = k8sClient.Delete(ctx, pod, &client.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By("Waiting for job to be succeeded") - jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} - Eventually(func() div1alpha2.Phase { - err := k8sClient.Get(ctx, jobKey, job) - if err != nil { - return div1alpha2.JobUnknown - } - return job.Status.Phase - }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) - - testutil.CleanUpJob(ctx, k8sClient, job) - } - }) - }) - - Context("When some of DIJob's replicas are failed", func() { - It("DIJob should catch the change and request di-server to create new replicas", func() { - testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} - for i := range testCases { - tc := testCases[i] - By(fmt.Sprintf("Create %dth DIJob", i+1)) - - jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml") - if tc == "aggregator" || tc == "ddp-learner" { - jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml") - } - - sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) - job := buildDIJob(jobPath, sharedVolumePath) - - ctx := context.Background() - var err error - - err = k8sClient.Create(ctx, job, &client.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Waiting for replicas to be running")) - err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) - Expect(err).NotTo(HaveOccurred()) - - By("Checking coordinator's log to decide to failed replicas") - replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") - err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Make %s pod failed", tc)) - pods, err := diutil.ListPods(ctx, k8sClient, job) - Expect(err).NotTo(HaveOccurred()) - - var pod *corev1.Pod - for _, ipod := range pods { - if strings.Contains(ipod.Name, tc) { - pod = ipod - } - } - - By(fmt.Sprintf("Make %s replica failed", pod.Name)) - cmd := fmt.Sprintf("kubectl exec -n %s %s -c shell -- sh -c ", pod.Namespace, pod.Name) - podCmd := `"kill -9 \$(ps -ef |grep \"/opt/conda/bin/ding\"| grep -v \"ps -ef\"|awk 'NR<2 {print \$1}')"` - cmd = fmt.Sprintf("%s%s", cmd, podCmd) - command := exec.Command("bash", "-c", cmd) - _, err = command.CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - - By("Waiting for job to be completed") - expectedContainerStatus := "Completed" - if tc == "coordinator" { - expectedContainerStatus = "Error" - } - podKey := types.NamespacedName{Namespace: job.Namespace, Name: fmt.Sprintf("%s-%s", job.Name, tc)} - Eventually(func() string { - var pod *corev1.Pod - err := k8sClient.Get(ctx, podKey, pod) - if err != nil { - return "Unknown" - } - for _, status := range pod.Status.ContainerStatuses { - if status.Name != dicommon.DefaultContainerName { - continue - } - if status.State.Terminated != nil { - return status.State.Terminated.Reason - } - } - return "Unknown" - }, timeout, interval).Should(Equal(expectedContainerStatus)) - - testutil.CleanUpJob(ctx, k8sClient, job) - } - }) - }) -}) - -func buildDIJob(jobPath, sharedVolumePath string) *div1alpha2.DIJob { - yamlFile, err := ioutil.ReadFile(jobPath) - Expect(err).NotTo(HaveOccurred()) - - var job div1alpha2.DIJob - err = yaml.Unmarshal(yamlFile, &job) - Expect(err).NotTo(HaveOccurred()) - - if job.Labels == nil { - job.Labels = make(map[string]string) - } - job.Labels["stability-test"] = "dijobs" - return &job -} +// import ( +// "context" +// "fmt" +// "io/ioutil" +// "os/exec" +// "path/filepath" +// "strings" +// "time" + +// . "github.com/onsi/ginkgo" +// . "github.com/onsi/gomega" +// corev1 "k8s.io/api/core/v1" +// "k8s.io/apimachinery/pkg/types" +// "k8s.io/apimachinery/pkg/util/yaml" +// "sigs.k8s.io/controller-runtime/pkg/client" + +// div1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2" +// dicommon "opendilab.org/di-orchestrator/pkg/common" +// diutil "opendilab.org/di-orchestrator/pkg/utils" +// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils" +// ) + +// const ( +// namespace = "test" +// messageRepresentJobStartsTraining = "Sample data 0 Times" +// defaultSharedVolumeName = "work-dir" + +// logTimeout = 5 * time.Minute +// networkFailedDuration = 10 * time.Second +// timeout = 20 * time.Minute +// interval = 3 * time.Second +// ) + +// var _ = Describe("E2E test for DI-engine", func() { +// Context("When DIJob meets network exception", func() { +// It("Should reconnect after each module's service is recreated", func() { +// testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} +// for i := range testCases { +// tc := testCases[i] +// By(fmt.Sprintf("Create %dth DIJob", i+1)) + +// jobPath := filepath.Join(exampleJobsDir, "dijob.yaml") +// if tc == "aggregator" || tc == "ddp-learner" { +// jobPath = filepath.Join(exampleJobsDir, "dijob-multi-gpu.yaml") +// } + +// sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) +// job := buildDIJob(jobPath, sharedVolumePath) + +// ctx := context.Background() +// var err error + +// err = k8sClient.Create(ctx, job, &client.CreateOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Waiting for replicas to be running")) +// err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) +// Expect(err).NotTo(HaveOccurred()) + +// By("Checking coordinator's log to decide to delete services") +// replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") +// err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Delete %s service for a while", tc)) +// svcs, err := diutil.ListServices(ctx, k8sClient, job) +// Expect(err).NotTo(HaveOccurred()) + +// var svc *corev1.Service +// for _, isvc := range svcs { +// if strings.Contains(isvc.Name, tc) { +// svc = isvc +// } +// } + +// By(fmt.Sprintf("Delete %s service", svc.Name)) +// err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By("Waiting for job to be succeeded") +// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} +// Eventually(func() div1alpha2.Phase { +// err := k8sClient.Get(ctx, jobKey, job) +// if err != nil { +// return div1alpha2.JobUnknown +// } +// return job.Status.Phase +// }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) + +// testutil.CleanUpJob(ctx, k8sClient, job) +// } +// }) + +// It("Should reconnect after all replicas are available again", func() { +// By("Create DIJob") +// jobPath := filepath.Join(exampleJobsDir, "dijob-multi-gpu.yaml") +// sharedVolumePath := filepath.Join(sharedVolumesDir, "cartpole-multi-gpu") +// job := buildDIJob(jobPath, sharedVolumePath) + +// ctx := context.Background() +// var err error + +// err = k8sClient.Create(ctx, job, &client.CreateOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Waiting for replicas to be running")) +// err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) +// Expect(err).NotTo(HaveOccurred()) + +// By("Checking coordinator's log to decide to delete services") +// replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") +// err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) +// Expect(err).NotTo(HaveOccurred()) + +// By("Delete all modules' service for a while") +// svcs, err := diutil.ListServices(ctx, k8sClient, job) +// Expect(err).NotTo(HaveOccurred()) + +// for _, svc := range svcs { +// svc.ResourceVersion = "" +// svc.Status = corev1.ServiceStatus{} + +// By(fmt.Sprintf("Delete %s service", svc.Name)) +// err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Sleep for %s before delete next module", networkFailedDuration.String())) +// time.Sleep(networkFailedDuration) +// } + +// By("Waiting for job to be succeeded") +// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} +// Eventually(func() div1alpha2.Phase { +// err := k8sClient.Get(ctx, jobKey, job) +// if err != nil { +// return div1alpha2.JobUnknown +// } +// return job.Status.Phase +// }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) + +// testutil.CleanUpJob(ctx, k8sClient, job) +// }) +// }) +// Context("When some of DIJob's replicas are deleted", func() { +// It("DIJob should catch the change and request di-server to create new replicas", func() { +// testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} +// for i := range testCases { +// tc := testCases[i] +// By(fmt.Sprintf("Create %dth DIJob", i+1)) + +// jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml") +// if tc == "aggregator" || tc == "ddp-learner" { +// jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml") +// } + +// sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) +// job := buildDIJob(jobPath, sharedVolumePath) + +// ctx := context.Background() +// var err error + +// err = k8sClient.Create(ctx, job, &client.CreateOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Waiting for replicas to be running")) +// err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) +// Expect(err).NotTo(HaveOccurred()) + +// By("Checking coordinator's log to decide to delete replicas") +// replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") +// err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Delete %s pod for a while", tc)) +// pods, err := diutil.ListPods(ctx, k8sClient, job) +// Expect(err).NotTo(HaveOccurred()) + +// var pod *corev1.Pod +// for _, ipod := range pods { +// if strings.Contains(ipod.Name, tc) { +// pod = ipod +// } +// } + +// By(fmt.Sprintf("Delete %s replica", pod.Name)) +// err = k8sClient.Delete(ctx, pod, &client.DeleteOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By("Waiting for job to be succeeded") +// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name} +// Eventually(func() div1alpha2.Phase { +// err := k8sClient.Get(ctx, jobKey, job) +// if err != nil { +// return div1alpha2.JobUnknown +// } +// return job.Status.Phase +// }, timeout, interval).Should(Equal(div1alpha2.JobSucceeded)) + +// testutil.CleanUpJob(ctx, k8sClient, job) +// } +// }) +// }) + +// Context("When some of DIJob's replicas are failed", func() { +// It("DIJob should catch the change and request di-server to create new replicas", func() { +// testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"} +// for i := range testCases { +// tc := testCases[i] +// By(fmt.Sprintf("Create %dth DIJob", i+1)) + +// jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml") +// if tc == "aggregator" || tc == "ddp-learner" { +// jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml") +// } + +// sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i)) +// job := buildDIJob(jobPath, sharedVolumePath) + +// ctx := context.Background() +// var err error + +// err = k8sClient.Create(ctx, job, &client.CreateOptions{}) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Waiting for replicas to be running")) +// err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning) +// Expect(err).NotTo(HaveOccurred()) + +// By("Checking coordinator's log to decide to failed replicas") +// replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator") +// err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout) +// Expect(err).NotTo(HaveOccurred()) + +// By(fmt.Sprintf("Make %s pod failed", tc)) +// pods, err := diutil.ListPods(ctx, k8sClient, job) +// Expect(err).NotTo(HaveOccurred()) + +// var pod *corev1.Pod +// for _, ipod := range pods { +// if strings.Contains(ipod.Name, tc) { +// pod = ipod +// } +// } + +// By(fmt.Sprintf("Make %s replica failed", pod.Name)) +// cmd := fmt.Sprintf("kubectl exec -n %s %s -c shell -- sh -c ", pod.Namespace, pod.Name) +// podCmd := `"kill -9 \$(ps -ef |grep \"/opt/conda/bin/ding\"| grep -v \"ps -ef\"|awk 'NR<2 {print \$1}')"` +// cmd = fmt.Sprintf("%s%s", cmd, podCmd) +// command := exec.Command("bash", "-c", cmd) +// _, err = command.CombinedOutput() +// Expect(err).NotTo(HaveOccurred()) + +// By("Waiting for job to be completed") +// expectedContainerStatus := "Completed" +// if tc == "coordinator" { +// expectedContainerStatus = "Error" +// } +// podKey := types.NamespacedName{Namespace: job.Namespace, Name: fmt.Sprintf("%s-%s", job.Name, tc)} +// Eventually(func() string { +// var pod *corev1.Pod +// err := k8sClient.Get(ctx, podKey, pod) +// if err != nil { +// return "Unknown" +// } +// for _, status := range pod.Status.ContainerStatuses { +// if status.Name != dicommon.DefaultContainerName { +// continue +// } +// if status.State.Terminated != nil { +// return status.State.Terminated.Reason +// } +// } +// return "Unknown" +// }, timeout, interval).Should(Equal(expectedContainerStatus)) + +// testutil.CleanUpJob(ctx, k8sClient, job) +// } +// }) +// }) +// }) + +// func buildDIJob(jobPath, sharedVolumePath string) *div1alpha2.DIJob { +// yamlFile, err := ioutil.ReadFile(jobPath) +// Expect(err).NotTo(HaveOccurred()) + +// var job div1alpha2.DIJob +// err = yaml.Unmarshal(yamlFile, &job) +// Expect(err).NotTo(HaveOccurred()) + +// if job.Labels == nil { +// job.Labels = make(map[string]string) +// } +// job.Labels["stability-test"] = "dijobs" +// return &job +// } diff --git a/pkg/common/config.go b/pkg/common/config.go index 00a0af723656c5b79709f8d2ff12f1d09334c8ba..b3a04c7952499ae87343f7880ad5c09d60054bb6 100644 --- a/pkg/common/config.go +++ b/pkg/common/config.go @@ -41,7 +41,6 @@ func GetDIServerURL() string { url := os.Getenv(ENVServerURL) if url == "" { return "di-server.di-system:8080" - } else { - return url } + return url } diff --git a/pkg/common/gpuallocator/gpu_allocator.go b/pkg/common/gpuallocator/gpu_allocator.go index f3921f8644eeccde2cfc1c54dc1bc6ea3087f0f5..b6673016872f56cb71aa37fd87faa4a42e18f183 100644 --- a/pkg/common/gpuallocator/gpu_allocator.go +++ b/pkg/common/gpuallocator/gpu_allocator.go @@ -1,4 +1,4 @@ -package gpu_allocator +package gpuallocator import ( "fmt" diff --git a/pkg/common/gpuallocator/gpu_allocator_test.go b/pkg/common/gpuallocator/gpu_allocator_test.go index 6705b55ae4fb7b01dafd8710bbfdcfd1eff00ea3..36e637a9e1127522b838313b2c7311fa24cfc76e 100644 --- a/pkg/common/gpuallocator/gpu_allocator_test.go +++ b/pkg/common/gpuallocator/gpu_allocator_test.go @@ -1,4 +1,4 @@ -package gpu_allocator +package gpuallocator import ( "testing" diff --git a/pkg/controllers/suite_test.go b/pkg/controllers/suite_test.go index f7aec9c1f9c8a50d8b143aea7e5384ed7a8cc8ce..131065ec91578ce6f7d11a27915908e152fa174a 100644 --- a/pkg/controllers/suite_test.go +++ b/pkg/controllers/suite_test.go @@ -21,7 +21,6 @@ import ( "fmt" "path/filepath" "testing" - "time" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/config" @@ -43,9 +42,9 @@ import ( // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. const ( - timeout = 5 * time.Second - interval = 250 * time.Millisecond - duration = 200 * time.Millisecond +// timeout = 5 * time.Second +// interval = 250 * time.Millisecond +// duration = 200 * time.Millisecond ) // var cfg *rest.Config @@ -91,8 +90,8 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - ctx := handler.NewContext(cfg, - context.Background(), + ctx := handler.NewContext(context.Background(), + cfg, k8sManager.GetClient(), k8sManager.GetEventRecorderFor("di-operator"), ctrl.Log.WithName("di-operator")) diff --git a/pkg/handler/context.go b/pkg/handler/context.go index c917fcd3c121ce9a3a982f782fa939ae8d36c555..35117cb045169dad57abd7d0caa96d2337cd0d8c 100644 --- a/pkg/handler/context.go +++ b/pkg/handler/context.go @@ -17,7 +17,7 @@ type Context struct { Recorder record.EventRecorder } -func NewContext(config *rest.Config, ctx context.Context, client client.Client, recorder record.EventRecorder, logger logr.Logger) *Context { +func NewContext(ctx context.Context, config *rest.Config, client client.Client, recorder record.EventRecorder, logger logr.Logger) *Context { return &Context{ config: config, ctx: ctx, diff --git a/pkg/handler/status.go b/pkg/handler/status.go index 89c85ff059f911e8545ce9a9f4a8b4f75a8a4115..aeee78a80b88634f1801e43823d8d85d0cfa193d 100644 --- a/pkg/handler/status.go +++ b/pkg/handler/status.go @@ -182,27 +182,27 @@ func (c *Context) UpdateDIJobStatusInCluster(job *div1alpha2.DIJob) error { return err } -func (r *Context) UpdateJobStatus( +func (c *Context) UpdateJobStatus( job *div1alpha2.DIJob, phase div1alpha2.Phase, reason string, msg string) { updateDIJobConditions(job, phase, reason, msg) switch phase { case div1alpha2.JobPending, div1alpha2.JobStarting: - r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) + c.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) case div1alpha2.JobRunning: if job.Status.Phase != div1alpha2.JobRunning { - r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) + c.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) } case div1alpha2.JobRestarting: - job.Status.Generation += 1 - r.Recorder.Eventf(job, corev1.EventTypeWarning, reason, msg) + job.Status.Generation++ + c.Recorder.Eventf(job, corev1.EventTypeWarning, reason, msg) case div1alpha2.JobFailed: job.Status.ReadyReplicas = 0 - r.Recorder.Eventf(job, corev1.EventTypeWarning, reason, msg) + c.Recorder.Eventf(job, corev1.EventTypeWarning, reason, msg) case div1alpha2.JobSucceeded: job.Status.ReadyReplicas = 0 - r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) + c.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) default: - r.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) + c.Recorder.Eventf(job, corev1.EventTypeNormal, reason, msg) } job.Status.Phase = phase } diff --git a/pkg/server/http/dijob.go b/pkg/server/http/dijob.go index bb17e2b74dad5154bdc6bb27727e82c77dded2bf..adf8707f79b9cb7579c33d7eaea1ec510c925071 100644 --- a/pkg/server/http/dijob.go +++ b/pkg/server/http/dijob.go @@ -68,10 +68,6 @@ func (s *DIServer) needMultiDDPLearnerPod(resource commontypes.ResourceQuantity) return false, nil } -func needAggregator(resource commontypes.ResourceQuantity) bool { - return resource.GPU.Value() > 1 -} - func (s *DIServer) updateDIJobStatusInCluster(job *div1alpha2.DIJob) error { var err error for i := 0; i < statusUpdateRetries; i++ { diff --git a/pkg/server/http/server.go b/pkg/server/http/server.go index 2de5065c1e64b15b2d4a844c8f4d976956f66e48..24b08b0c976783a94a6eb3e31bd756efe98ac729 100644 --- a/pkg/server/http/server.go +++ b/pkg/server/http/server.go @@ -26,9 +26,8 @@ import ( ) var ( - apiVersion = "v1alpha2" - replicasAPI = "/replicas" - replicasFailedAPI = "/replicas/failed" + apiVersion = "v1alpha2" + replicasAPI = "/replicas" ) func withAPIVersion(api string) string {