diff --git a/cmd/ms-operator.v1/app/options/options.go b/cmd/ms-operator.v1/app/options/options.go index 2ae3845fb848b8fe3348f38a1fe1a39940c15145..86c2968c9e4a910ccc53cb3dfcb9539ffc21842a 100755 --- a/cmd/ms-operator.v1/app/options/options.go +++ b/cmd/ms-operator.v1/app/options/options.go @@ -37,7 +37,7 @@ func NewServerOption() *ServerOption { // AddFlags adds flags for a specific CMServer to the specified FlagSet func (s *ServerOption) AddFlags(fs *flag.FlagSet) { // chaos level will be removed once we have a formal tool to inject failures. - fs.IntVar(&s.ChaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the PyTorchJob created by the operator.") + fs.IntVar(&s.ChaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the MSJob created by the operator.") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.DurationVar(&s.GCInterval, "gc-interval", 10*time.Minute, "GC interval") fs.StringVar(&s.ControllerConfigFile, "controller-config-file", "", "Path to file containing the controller config.") diff --git a/pkg/apis/mindspore/helper/helper.go b/pkg/apis/mindspore/helper/helper.go index 3958a222a867754539ce17c7a89d787d947f73ff..59118b06d2649db7438aedff8f5c2390f6a3feba 100755 --- a/pkg/apis/mindspore/helper/helper.go +++ b/pkg/apis/mindspore/helper/helper.go @@ -106,8 +106,7 @@ func ConfigureAcceleratorsForMSJobSpec(c *msv1.MSJobSpec, accelerators map[strin // Cleanup cleans up user passed spec, e.g. defaulting, transforming fields. // TODO: move this to admission controller func Cleanup(c *msv1.MSJobSpec) { - // TODO(jlewi): Add logic to cleanup user provided spec; e.g. by filling in defaults. - // We should have default container images so user doesn't have to provide these. + } func CRDName() string { diff --git a/pkg/apis/mindspore/v1/types.go b/pkg/apis/mindspore/v1/types.go index 1d58b0f31ad119b75b83f8b76c5f339ab40fdd9d..7d16f8f3713e36ff38a0e13d7deb3f70a03b1a20 100755 --- a/pkg/apis/mindspore/v1/types.go +++ b/pkg/apis/mindspore/v1/types.go @@ -45,7 +45,6 @@ type MSJob struct { } type MSJobSpec struct { - // TODO(jlewi): Can we we get rid of this and use some value from Kubernetes or a random ide. RuntimeId string // ReplicaSpecs specifies the MS replicas to run. @@ -81,11 +80,9 @@ const ( const ( DefaultMSContainer string = "mindspore" - DefaultMSImage string = "mindspore/mindspore:v0.1.0" + DefaultMSImage string = "mindspore/mindspore:v0.1.0-alpha" ) -// TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. -// This might be useful if you wanted to have a separate set of workers to do eval. type MSReplicaSpec struct { // Replicas is the number of desired replicas. // This is a pointer to distinguish between explicit zero and unspecified. diff --git a/pkg/apis/mindspore/validation/validation.go b/pkg/apis/mindspore/validation/validation.go index 3431f1220191f113318399ddf3109409111ceeb3..5560c3e504c96cbe043f274c12b748e4e7723695 100755 --- a/pkg/apis/mindspore/validation/validation.go +++ b/pkg/apis/mindspore/validation/validation.go @@ -57,7 +57,7 @@ func ValidateMSJobSpec(c *msv1.MSJobSpec) error { } if !isValidReplicaType { - return fmt.Errorf("tfReplicaSpec.MSReplicaType is %v but must be one of %v", r.MSReplicaType, validReplicaTypes) + return fmt.Errorf("msReplicaSpec.MSReplicaType is %v but must be one of %v", r.MSReplicaType, validReplicaTypes) } for _, c := range r.Template.Spec.Containers { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a2b64fc9e659b534ad32123431d583ab7c38c9f1..53e3b05880d743c0a133f6b694d575f9504ac523 100755 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package controller provides a Kubernetes controller for a TensorFlow job resource. +// Package controller provides a Kubernetes controller for a MindSpore job resource. package controller import ( @@ -85,9 +85,9 @@ type Controller struct { syncHandler func(jobKey string) (bool, error) } -func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Interface, tfJobClient msjobclient.Interface, - config msv1.ControllerConfig, tfJobInformerFactory informers.SharedInformerFactory) (*Controller, error) { - tfJobInformer := tfJobInformerFactory.Kubeflow().V1().MSJobs() +func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Interface, msJobClient msjobclient.Interface, + config msv1.ControllerConfig, msJobInformerFactory informers.SharedInformerFactory) (*Controller, error) { + msJobInformer := msJobInformerFactory.Kubeflow().V1().MSJobs() kubeflowscheme.AddToScheme(scheme.Scheme) log.Debug("Creating event broadcaster") @@ -99,22 +99,21 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter controller := &Controller{ KubeClient: kubeClient, APIExtclient: APIExtclient, - MSJobClient: tfJobClient, + MSJobClient: msJobClient, WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MSjobs"), recorder: recorder, - // TODO(jlewi)): What to do about cluster.Cluster? jobs: make(map[string]*trainer.TrainingJob), config: config, } log.Info("Setting up event handlers") // Set up an event handler for when Foo resources change - tfJobInformer.Informer().AddEventHandler( + msJobInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *msv1.MSJob: - log.Debugf("filter tfjob name: %v", t.Name) + log.Debugf("filter msjob name: %v", t.Name) return true default: return false @@ -129,8 +128,8 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter }, }) - controller.MSJobLister = tfJobInformer.Lister() - controller.MSJobSynced = tfJobInformer.Informer().HasSynced + controller.MSJobLister = msJobInformer.Lister() + controller.MSJobSynced = msJobInformer.Informer().HasSynced controller.syncHandler = controller.syncMSJob return controller, nil @@ -216,7 +215,7 @@ func (c *Controller) syncMSJob(key string) (bool, error) { return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } - tfJob, err := c.MSJobLister.MSJobs(ns).Get(name) + msJob, err := c.MSJobLister.MSJobs(ns).Get(name) if err != nil { if apierrors.IsNotFound(err) { @@ -228,8 +227,8 @@ func (c *Controller) syncMSJob(key string) (bool, error) { // Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match. // The UID's won't match in the event we deleted the job and then recreated the job with the same name. - if cJob, ok := c.jobs[key]; !ok || cJob.UID() != tfJob.UID { - nc, err := trainer.NewJob(c.KubeClient, c.MSJobClient, c.recorder, tfJob, &c.config) + if cJob, ok := c.jobs[key]; !ok || cJob.UID() != msJob.UID { + nc, err := trainer.NewJob(c.KubeClient, c.MSJobClient, c.recorder, msJob, &c.config) if err != nil { return false, err @@ -243,15 +242,13 @@ func (c *Controller) syncMSJob(key string) (bool, error) { return false, err } - tfJob, err = c.MSJobClient.KubeflowV1().MSJobs(tfJob.ObjectMeta.Namespace).Get(tfJob.ObjectMeta.Name, metav1.GetOptions{}) + msJob, err = c.MSJobClient.KubeflowV1().MSJobs(msJob.ObjectMeta.Namespace).Get(msJob.ObjectMeta.Name, metav1.GetOptions{}) if err != nil { return false, err } - // TODO(jlewi): This logic will need to change when/if we get rid of phases and move to conditions. At that - // case we should forget about a job when the appropriate condition is reached. - if tfJob.Status.Phase == msv1.MSJobPhaseCleanUp { + if msJob.Status.Phase == msv1.MSJobPhaseCleanUp { return true, nil } return false, nil diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index bc2decd58c26e8ed0232c3823ebe36b20c661a12..8983ab2dd1a9fba4957c3dea09cf7c772dbdc1d8 100755 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -29,9 +29,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" - torchv1alpha1 "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/v1" + msv1 "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/v1" "gitee.com/mindspore/ms-operator/pkg/util/k8sutil" - // TOOO(jlewi): Rename to apiErrors "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/helper" "gitee.com/mindspore/ms-operator/pkg/util" ) @@ -47,53 +46,53 @@ type MSReplicaSet struct { recorder record.EventRecorder // Job is a pointer to the TrainingJob to which this replica belongs. Job *TrainingJob - Spec torchv1alpha1.MSReplicaSpec + Spec msv1.MSReplicaSpec } // MSReplicas is an interface for managing a set of replicas. type MSReplicaSetInterface interface { Create() error Delete() error - GetStatus() (torchv1alpha1.MSReplicaStatus, error) + GetStatus() (msv1.MSReplicaStatus, error) } -// MSConfig is a struct representing the TensorFlow config. This struct is turned into an environment -// which is used by TensorFlow processes to configure themselves. +// MSConfig is a struct representing the MindSpore config. This struct is turned into an environment +// which is used by MindSpore processes to configure themselves. type MSConfig struct { Cluster ClusterSpec `json:"cluster"` Task TaskSpec `json:"task"` Environment string `json:"environment"` } -func NewMSReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecorder, tfReplicaSpec torchv1alpha1.MSReplicaSpec, job *TrainingJob) (*MSReplicaSet, error) { - if tfReplicaSpec.MSReplicaType == torchv1alpha1.MASTER && *tfReplicaSpec.Replicas != 1 { +func NewMSReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecorder, msReplicaSpec msv1.MSReplicaSpec, job *TrainingJob) (*MSReplicaSet, error) { + if msReplicaSpec.MSReplicaType == msv1.MASTER && *msReplicaSpec.Replicas != 1 { return nil, errors.New("The MASTER must have Replicas = 1") } - if tfReplicaSpec.MasterPort == nil { - return nil, errors.New("tfReplicaSpec.MasterPort can't be nil.") + if msReplicaSpec.MasterPort == nil { + return nil, errors.New("msReplicaSpec.MasterPort can't be nil.") } // Make sure the replica type is valid. - validReplicaTypes := []torchv1alpha1.MSReplicaType{torchv1alpha1.MASTER, torchv1alpha1.WORKER} + validReplicaTypes := []msv1.MSReplicaType{msv1.MASTER, msv1.WORKER} isValidReplicaType := false for _, t := range validReplicaTypes { - if t == tfReplicaSpec.MSReplicaType { + if t == msReplicaSpec.MSReplicaType { isValidReplicaType = true break } } if !isValidReplicaType { - return nil, fmt.Errorf("tfReplicaSpec.MSReplicaType is %v but must be one of %v", tfReplicaSpec.MSReplicaType, validReplicaTypes) + return nil, fmt.Errorf("msReplicaSpec.MSReplicaType is %v but must be one of %v", msReplicaSpec.MSReplicaType, validReplicaTypes) } return &MSReplicaSet{ ClientSet: clientSet, recorder: recorder, Job: job, - Spec: tfReplicaSpec, + Spec: msReplicaSpec, }, nil } @@ -108,7 +107,7 @@ func (s *MSReplicaSet) Labels() KubernetesLabels { "ms_job_name": s.Job.job.ObjectMeta.Name}) } -func (s *MSReplicaSet) Create(config *torchv1alpha1.ControllerConfig, worldSize int32) error { +func (s *MSReplicaSet) Create(config *msv1.ControllerConfig, worldSize int32) error { // Create services err := s.SyncServices() if err != nil { @@ -137,7 +136,7 @@ func (s *MSReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error) Selector: taskLabels, Ports: []v1.ServicePort{ { - Name: "tf-port", + Name: "ms-port", Port: *s.Spec.MasterPort, }, }, @@ -173,7 +172,7 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod masterAddr = "localhost" } rank := strconv.Itoa(int(index)) - tfConfig := MSConfig{ + msConfig := MSConfig{ Cluster: s.Job.ClusterSpec(), Task: TaskSpec{ Type: strings.ToLower(string(s.Spec.MSReplicaType)), @@ -183,27 +182,26 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod Environment: "cloud", } - tfConfigJson, err := json.Marshal(tfConfig) + msConfigJson, err := json.Marshal(msConfig) if err != nil { - log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err) + log.Errorf("Job: %v serializing msConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(msConfig), err) return nil, err } - // TODO(jose5918) Do not need TF_CONFIG but leaving for POC - // Add TF_CONFIG environment variable. + // Add MS_CONFIG environment variable. for i, _ := range pod.Spec.Containers { // We can't get c in the loop variable because that would be by value so our modifications // wouldn't have any effect. c := &pod.Spec.Containers[i] - if c.Name != torchv1alpha1.DefaultMSContainer { + if c.Name != msv1.DefaultMSContainer { continue } if len(c.Env) == 0 { c.Env = make([]v1.EnvVar, 0) } c.Env = append(c.Env, v1.EnvVar{ - Name: "TF_CONFIG", - Value: string(tfConfigJson), + Name: "MS_CONFIG", + Value: string(msConfigJson), }) c.Env = append(c.Env, v1.EnvVar{ Name: "MASTER_PORT", @@ -258,7 +256,6 @@ func (s *MSReplicaSet) Delete() error { } // Services doesn't support DeleteCollection so we delete them individually. - // TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases. for index := int32(0); index < *s.Spec.Replicas; index++ { log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index))) err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{}) @@ -269,17 +266,17 @@ func (s *MSReplicaSet) Delete() error { } } - // If the ConfigMap for the default parameter server exists, we delete it - log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) - _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) + // If the ConfigMap for the default master exists, we delete it + log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultMasterConfigMapName()) + _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultMasterConfigMapName(), meta_v1.GetOptions{}) if err != nil { if !k8sutil.IsKubernetesResourceNotFoundError(err) { - log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) + log.Errorf("Error deleting ConfigMap %v; %v", s.defaultMasterConfigMapName(), err) failures = true } } else { - log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) - err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) + log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultMasterConfigMapName()) + err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultMasterConfigMapName(), &meta_v1.DeleteOptions{}) if err != nil { log.Errorf("There was a problem deleting the ConfigMaps; %v", err) failures = true @@ -293,7 +290,7 @@ func (s *MSReplicaSet) Delete() error { } // replicaStatusFromPodList returns a status from a list of pods for a job. -func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaState { +func replicaStatusFromPodList(l v1.PodList, name string) msv1.ReplicaState { var latest *v1.Pod for _, i := range l.Items { if latest == nil { @@ -306,10 +303,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt } if latest == nil { - return torchv1alpha1.ReplicaStateRunning + return msv1.ReplicaStateRunning } - var tfState v1.ContainerState + var msState v1.ContainerState for _, i := range latest.Status.ContainerStatuses { if i.Name != name { @@ -317,45 +314,45 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt } // We need to decide whether to use the current state or the previous termination state. - tfState = i.State + msState = i.State // If the container previously terminated we will look at the termination to decide whether it is a retryable // or permanenent error. if i.LastTerminationState.Terminated != nil { - tfState = i.LastTerminationState + msState = i.LastTerminationState } } - if tfState.Running != nil || tfState.Waiting != nil { - return torchv1alpha1.ReplicaStateRunning + if msState.Running != nil || msState.Waiting != nil { + return msv1.ReplicaStateRunning } - if tfState.Terminated != nil { - if tfState.Terminated.ExitCode == 0 { - return torchv1alpha1.ReplicaStateSucceeded + if msState.Terminated != nil { + if msState.Terminated.ExitCode == 0 { + return msv1.ReplicaStateSucceeded } - if isRetryableTerminationState(tfState.Terminated) { + if isRetryableTerminationState(msState.Terminated) { // Since its a retryable error just return RUNNING. // We can just let Kubernetes restart the container to retry. - return torchv1alpha1.ReplicaStateRunning + return msv1.ReplicaStateRunning } - return torchv1alpha1.ReplicaStateFailed + return msv1.ReplicaStateFailed } - return torchv1alpha1.ReplicaStateUnknown + return msv1.ReplicaStateUnknown } -func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) torchv1alpha1.ReplicaState { +func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) msv1.ReplicaState { p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{}) if err != nil { - return torchv1alpha1.ReplicaStateUnknown + return msv1.ReplicaStateUnknown } if v1.PodSucceeded == p.Status.Phase { - return torchv1alpha1.ReplicaStateSucceeded + return msv1.ReplicaStateSucceeded } labels := s.Labels() @@ -363,33 +360,30 @@ func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) torchv1alpha1.Replica selector, err := labels.ToSelector() if err != nil { log.Errorf("labels.ToSelector() error; %v", err) - return torchv1alpha1.ReplicaStateFailed + return msv1.ReplicaStateFailed } - // TODO(jlewi): Handle errors. We need to get the pod and looking at recent container exits. l, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(meta_v1.ListOptions{ - // TODO(jlewi): Why isn't the label selector working? LabelSelector: selector, }) if err != nil { - // TODO(jlewi): Are there errors that should be treated as retryable errors? - return torchv1alpha1.ReplicaStateFailed + return msv1.ReplicaStateFailed } - status := replicaStatusFromPodList(*l, torchv1alpha1.DefaultMSContainer) + status := replicaStatusFromPodList(*l, msv1.DefaultMSContainer) return status } // Status returns the status of the replica set. -func (s *MSReplicaSet) GetStatus() (torchv1alpha1.MSReplicaStatus, error) { - status := torchv1alpha1.MSReplicaStatus{ +func (s *MSReplicaSet) GetStatus() (msv1.MSReplicaStatus, error) { + status := msv1.MSReplicaStatus{ MSReplicaType: s.Spec.MSReplicaType, - State: torchv1alpha1.ReplicaStateUnknown, - ReplicasStates: make(map[torchv1alpha1.ReplicaState]int), + State: msv1.ReplicaStateUnknown, + ReplicasStates: make(map[msv1.ReplicaState]int), } - increment := func(state torchv1alpha1.ReplicaState) { + increment := func(state msv1.ReplicaState) { v, ok := status.ReplicasStates[state] if ok { status.ReplicasStates[state] = v + 1 @@ -405,20 +399,20 @@ func (s *MSReplicaSet) GetStatus() (torchv1alpha1.MSReplicaStatus, error) { // Determine the overall status for the replica set based on the status of the individual // replicas. // If any of the replicas failed mark the set as failed. - if _, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateFailed]; ok { - status.State = torchv1alpha1.ReplicaStateFailed + if _, ok := status.ReplicasStates[msv1.ReplicaStateFailed]; ok { + status.State = msv1.ReplicaStateFailed return status, nil } // If any replicas are RUNNING mark it as RUNNING. - if _, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateRunning]; ok { - status.State = torchv1alpha1.ReplicaStateRunning + if _, ok := status.ReplicasStates[msv1.ReplicaStateRunning]; ok { + status.State = msv1.ReplicaStateRunning return status, nil } // If all of the replicas succeeded consider it success. - if v, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateSucceeded]; ok && int32(v) == *s.Spec.Replicas { - status.State = torchv1alpha1.ReplicaStateSucceeded + if v, ok := status.ReplicasStates[msv1.ReplicaStateSucceeded]; ok && int32(v) == *s.Spec.Replicas { + status.State = msv1.ReplicaStateSucceeded return status, nil } @@ -515,7 +509,7 @@ func (s *MSReplicaSet) SyncServices() error { } func (s *MSReplicaSet) genName(index int32) string { - // Truncate tfjob name to 40 characters + // Truncate msjob name to 40 characters // The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters // Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces // See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md @@ -527,7 +521,7 @@ func (s *MSReplicaSet) genPodName(index int32) string { return s.genName(index) + "-" + util.RandString(5) } -func (s *MSReplicaSet) defaultPSConfigMapName() string { +func (s *MSReplicaSet) defaultMasterConfigMapName() string { return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId) } diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 7b743b1fc96d86d2184853b06eda48080952d0dd..528879c07612043c005505e61b2711f7e7280ca6 100755 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -34,8 +34,6 @@ import ( "gitee.com/mindspore/ms-operator/pkg/util" ) -// TODO(jlewi): We should switch a New pattern and make trainingJob private so we can -// ensure correctness on creation. type TrainingJob struct { job *msv1.MSJob @@ -55,9 +53,7 @@ type TrainingJob struct { memberCounter int } -// TODO(jose5918): We don't really need the cluster spec for this operator but no harm in leaving it for POC -// ClusterSpec represents a cluster TensorFlow specification. -// https://www.tensorflow.org/deploy/distributed#create_a_tftrainclusterspec_to_describe_the_cluster +// ClusterSpec represents a cluster MindSpore specification. // It is a map from job names to network addresses. type ClusterSpec map[string][]string @@ -110,7 +106,6 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec { // createResources creates all the replicas if requested func (j *TrainingJob) createResources(config *msv1.ControllerConfig) error { - // TODO(jose5918) Need to figure out where it is best to add worldSize logic // Get MS worldSize by adding replicas worldSize := int32(0) for _, r := range j.Replicas { @@ -144,7 +139,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) { replicaStatuses := make([]*msv1.MSReplicaStatus, 0) // The state for each replica. - // TODO(jlewi): We will need to modify this code if we want to allow multiples of a given type of replica. replicaSetStates := make(map[msv1.MSReplicaType]msv1.ReplicaState) for _, r := range j.Replicas { @@ -176,8 +170,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) { // isRetryableTerminationState returns true if a container terminated in a state // that we consider retryable. func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool { - // TODO(jlewi): Need to match logic in - // https://cs.corp.google.com/piper///depot/google3/cloud/ml/beta/job/training_job_state_util.cc?l=88 if s.Reason == "OOMKilled" { // If the user's process causes an OOM and Docker kills the container, // the termination reason of ContainerState will be specified to @@ -189,8 +181,6 @@ func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool { return false } - // TODO(jlewi): Should we use the exit code reported in the termination - // log message and not the ExitCode reported by the container. if s.ExitCode >= 0 && s.ExitCode <= 127 { // For the exit_code in [0, 127]: @@ -271,19 +261,12 @@ func (j *TrainingJob) setupReplicas() error { } func (j *TrainingJob) Delete() { - // TODO(jlewi): Delete is what should cause us to delete the Pods. - // we shouldn't delete the pods when the jobs finish because leaving the pods - // allows us to get the logs from the pods after the job finishes. - // log.Infof("MSJob %v deleted by the user", j.fullname()) // TODO(jlewi): This logic is probably insufficient. if j.job.Status.Phase != msv1.MSJobPhaseCleanUp { j.status.Phase = msv1.MSJobPhaseCleanUp } - // TODO(jlewi): Does it make sense to explicitly delete the resources? Should - // we just rely on K8s garbage collection to delete the resources before - // deleting MSJob? if cErr := j.deleteResources(); cErr != nil { log.Errorf("trainingJob.deleteResources() error; %v", cErr) } @@ -331,9 +314,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { return err } - // TODO(jlewi): Can we determine from the CRD status whether we should - // Create the resources or not? We need to ensure the resources exist so for - // now we always call Create. if j.job.Status.Phase == msv1.MSJobPhaseCreating || j.job.Status.Phase == msv1.MSJobPhaseRunning { // We call Create to make sure all the resources exist and are running. if cErr := j.createResources(config); cErr != nil { @@ -354,7 +334,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { log.Errorf("GetStatus() for job %v returned error: %v", j.job.ObjectMeta.Name, err) return err } - // TODO(jlewi): We should update the Phase if we detect the job is done. if state == msv1.StateFailed { log.Errorf("Master failed Job: %v.", j.job.ObjectMeta.Name) j.status.Phase = msv1.MSJobPhaseDone @@ -367,7 +346,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { log.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status)) } } - // TODO(jose5918) Need to figure out where it is best to add worldSize logic // Get MS worldSize by adding replicas worldSize := int32(0) for _, r := range j.Replicas { diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 73b2eaa632f8f7084d39713e04b760f7012467cd..4b8ecf8a6e3345d631428719bad784590fc276c1 100755 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -33,8 +33,6 @@ import ( const RecommendedConfigPathEnvVar = "KUBECONFIG" -// TODO(jlewi): I think this function is used to add an owner to a resource. I think we we should use this -// method to ensure all resources created for the TFJob are owned by the TFJob. func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) { o.SetOwnerReferences(append(o.GetOwnerReferences(), r)) } @@ -99,15 +97,11 @@ func JobListOpt(clusterName string) metav1.ListOptions { func LabelsForJob(jobName string) map[string]string { return map[string]string{ - // TODO(jlewi): Need to set appropriate labels for TF. "ms_job": jobName, "app": msv1.AppLabel, } } -// TODO(jlewi): CascadeDeletOptions are part of garbage collection policy. -// Do we want to use this? See -// https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions { return &metav1.DeleteOptions{ GracePeriodSeconds: func(t int64) *int64 { return &t }(gracePeriodSeconds),