From 7de9c945f795c784d78d0cd9ffebcbde484c4715 Mon Sep 17 00:00:00 2001 From: liqingping Date: Thu, 25 Mar 2021 20:50:30 +0800 Subject: [PATCH] feat: implement nervex-server add function 1. implement nervex-server add function 2. move common function to util 3. add dockerfile for server 4. rm useless config --- Dockerfile | 1 - Dockerfile.server | 5 + Makefile | 15 +++ config/certmanager/certificate.yaml | 25 ---- config/certmanager/kustomization.yaml | 5 - config/certmanager/kustomizeconfig.yaml | 16 --- ...vex.sensetime.com_actorlearnerconfigs.yaml | 8 +- config/default/kustomization.yaml | 2 +- config/default/manager_auth_proxy_patch.yaml | 14 +-- config/default/manager_config_patch.yaml | 4 +- config/manager/kustomization.yaml | 3 +- config/manager/manager.yaml | 10 +- config/manager/nervex_server.yaml | 53 +++++++++ config/prometheus/kustomization.yaml | 2 - config/prometheus/monitor.yaml | 16 --- config/rbac/auth_proxy_service.yaml | 8 +- .../nervex_v1alpha1_actorleanerconfig.yaml | 8 +- config/samples/nervex_v1alpha1_nervexjob.yaml | 3 +- controllers/pod.go | 22 ++-- examples/mock-coordinator/main.go | 5 + server/dynamic/informer.go | 8 +- server/http/health.go | 22 ++++ server/http/server.go | 112 ++++++++++++------ server/main.go | 8 +- util/const.go | 4 +- util/util.go | 51 +++++++- 26 files changed, 273 insertions(+), 157 deletions(-) create mode 100644 Dockerfile.server delete mode 100644 config/certmanager/certificate.yaml delete mode 100644 config/certmanager/kustomization.yaml delete mode 100644 config/certmanager/kustomizeconfig.yaml create mode 100644 config/manager/nervex_server.yaml delete mode 100644 config/prometheus/kustomization.yaml delete mode 100644 config/prometheus/monitor.yaml create mode 100644 examples/mock-coordinator/main.go create mode 100644 server/http/health.go diff --git a/Dockerfile b/Dockerfile index 0f38b9f..4b5b1c6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,5 @@ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 WORKDIR / COPY --from=builder /workspace/manager . -USER 65532:65532 ENTRYPOINT ["/manager"] diff --git a/Dockerfile.server b/Dockerfile.server new file mode 100644 index 0000000..6d0dc6f --- /dev/null +++ b/Dockerfile.server @@ -0,0 +1,5 @@ +FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 +WORKDIR / +COPY /bin/nervex-server . + +ENTRYPOINT ["/nervex-server"] diff --git a/Makefile b/Makefile index 236c3d1..88a0a59 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ endif # Image URL to use all building/pushing image targets IMG ?= registry.sensetime.com/cloudnative4ai/nervex-operator:${VERSION} +SERVER_IMG ?= registry.sensetime.com/cloudnative4ai/nervex-server:${VERSION} # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) CRD_OPTIONS ?= "crd:trivialVersions=true,preserveUnknownFields=false" @@ -52,6 +53,11 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust ./hack/update-image-tags.sh config/manager ${MASTER_VERSION} ./hack/update-version.sh ${MASTER_VERSION} +dev-manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects. + $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases + ./hack/update-image-tags.sh config/manager ${VERSION} + ./hack/update-version.sh ${VERSION} + generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..." @@ -75,18 +81,21 @@ test: manifests generate fmt vet ## Run tests. build: generate fmt vet ## Build manager binary. go build -o bin/manager main.go + go build -o bin/nervex-server server/main.go run: manifests generate fmt vet ## Run a controller from your host. go run ./main.go dev-images: build docker build -t ${IMG} -f Dockerfile.dev . + docker build -t ${SERVER_IMG} -f Dockerfile.server . docker-build: ## Build docker image with the manager. docker build -t ${IMG} . docker-push: ## Push docker image with the manager. docker push ${IMG} + docker push ${SERVER_IMG} ##@ Deployment @@ -100,9 +109,15 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} $(KUSTOMIZE) build config/default | kubectl apply -f - +dev-deploy: dev-manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config. + cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} + $(KUSTOMIZE) build config/default | kubectl apply -f - + undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config. $(KUSTOMIZE) build config/default | kubectl delete -f - +dev-undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config. + $(KUSTOMIZE) build config/default | kubectl delete -f - CONTROLLER_GEN = $(shell pwd)/bin/controller-gen controller-gen: ## Download controller-gen locally if necessary. diff --git a/config/certmanager/certificate.yaml b/config/certmanager/certificate.yaml deleted file mode 100644 index 52d8661..0000000 --- a/config/certmanager/certificate.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# The following manifests contain a self-signed issuer CR and a certificate CR. -# More document can be found at https://docs.cert-manager.io -# WARNING: Targets CertManager v1.0. Check https://cert-manager.io/docs/installation/upgrading/ for breaking changes. -apiVersion: cert-manager.io/v1 -kind: Issuer -metadata: - name: selfsigned-issuer - namespace: system -spec: - selfSigned: {} ---- -apiVersion: cert-manager.io/v1 -kind: Certificate -metadata: - name: serving-cert # this name should match the one appeared in kustomizeconfig.yaml - namespace: system -spec: - # $(SERVICE_NAME) and $(SERVICE_NAMESPACE) will be substituted by kustomize - dnsNames: - - $(SERVICE_NAME).$(SERVICE_NAMESPACE).svc - - $(SERVICE_NAME).$(SERVICE_NAMESPACE).svc.cluster.local - issuerRef: - kind: Issuer - name: selfsigned-issuer - secretName: webhook-server-cert # this secret will not be prefixed, since it's not managed by kustomize diff --git a/config/certmanager/kustomization.yaml b/config/certmanager/kustomization.yaml deleted file mode 100644 index bebea5a..0000000 --- a/config/certmanager/kustomization.yaml +++ /dev/null @@ -1,5 +0,0 @@ -resources: -- certificate.yaml - -configurations: -- kustomizeconfig.yaml diff --git a/config/certmanager/kustomizeconfig.yaml b/config/certmanager/kustomizeconfig.yaml deleted file mode 100644 index 90d7c31..0000000 --- a/config/certmanager/kustomizeconfig.yaml +++ /dev/null @@ -1,16 +0,0 @@ -# This configuration is for teaching kustomize how to update name ref and var substitution -nameReference: -- kind: Issuer - group: cert-manager.io - fieldSpecs: - - kind: Certificate - group: cert-manager.io - path: spec/issuerRef/name - -varReference: -- kind: Certificate - group: cert-manager.io - path: spec/commonName -- kind: Certificate - group: cert-manager.io - path: spec/dnsNames diff --git a/config/crd/bases/nervex.sensetime.com_actorlearnerconfigs.yaml b/config/crd/bases/nervex.sensetime.com_actorlearnerconfigs.yaml index 2de7145..746cb77 100644 --- a/config/crd/bases/nervex.sensetime.com_actorlearnerconfigs.yaml +++ b/config/crd/bases/nervex.sensetime.com_actorlearnerconfigs.yaml @@ -20,16 +20,16 @@ spec: versions: - additionalPrinterColumns: - jsonPath: .status.Actors.Total - name: TotalActors + name: Total-Actors type: integer - jsonPath: .status.Actors.Active - name: ActiveActors + name: Active-Actors type: integer - jsonPath: .status.Learners.Total - name: TotalLearners + name: Total-Learners type: integer - jsonPath: .status.Learners.Active - name: ActiveLearners + name: Active-Learners type: integer name: v1alpha1 schema: diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index ff807f6..e967725 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -6,7 +6,7 @@ namespace: nervex-system # "wordpress" becomes "alices-wordpress". # Note that it should also match with the prefix (text before '-') of the namespace # field above. -namePrefix: nervex- +# namePrefix: nervex- # Labels to add to all resources and selectors. #commonLabels: diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index a224be1..0b5d1dc 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -3,22 +3,12 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: controller-manager - namespace: system + name: nervex-operator + namespace: nervex-system spec: template: spec: containers: - - name: kube-rbac-proxy - image: gcr.io/kubebuilder/kube-rbac-proxy:v0.8.0 - args: - - "--secure-listen-address=0.0.0.0:8443" - - "--upstream=http://127.0.0.1:8080/" - - "--logtostderr=true" - - "--v=10" - ports: - - containerPort: 8443 - name: https - name: manager args: - "--health-probe-bind-address=:8081" diff --git a/config/default/manager_config_patch.yaml b/config/default/manager_config_patch.yaml index 6c40015..1c00cd8 100644 --- a/config/default/manager_config_patch.yaml +++ b/config/default/manager_config_patch.yaml @@ -1,8 +1,8 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: controller-manager - namespace: system + name: nervex-operator + namespace: nervex-system spec: template: spec: diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 3511ab4..75d2ab9 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -1,6 +1,7 @@ resources: - manager.yaml - priority_class.yaml +- nervex_server.yaml generatorOptions: disableNameSuffixHash: true @@ -14,4 +15,4 @@ kind: Kustomization images: - name: controller newName: registry.sensetime.com/cloudnative4ai/nervex-operator - newTag: v0.0.1-alpha.0-962d399c + newTag: v0.0.1-alpha.0-63ba161d diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 42b2cbe..1f03a59 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -2,25 +2,25 @@ apiVersion: v1 kind: Namespace metadata: labels: - control-plane: controller-manager + control-plane: nervex-operator name: nervex-system --- apiVersion: apps/v1 kind: Deployment metadata: - name: controller-manager + name: nervex-operator namespace: nervex-system labels: - control-plane: controller-manager + control-plane: nervex-operator spec: selector: matchLabels: - control-plane: controller-manager + control-plane: nervex-operator replicas: 1 template: metadata: labels: - control-plane: controller-manager + control-plane: nervex-operator spec: containers: - command: diff --git a/config/manager/nervex_server.yaml b/config/manager/nervex_server.yaml new file mode 100644 index 0000000..b8d7155 --- /dev/null +++ b/config/manager/nervex_server.yaml @@ -0,0 +1,53 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nervex-server + namespace: nervex-system + labels: + control-plane: nervex-server +spec: + selector: + matchLabels: + control-plane: nervex-server + replicas: 1 + template: + metadata: + labels: + control-plane: nervex-server + spec: + containers: + - command: + - /nervex-server + image: registry.sensetime.com/cloudnative4ai/nervex-server:v0.0.1-alpha.0 + imagePullPolicy: Always + name: server + securityContext: + allowPrivilegeEscalation: false + livenessProbe: + httpGet: + path: /healthz + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 20 + resources: + limits: + cpu: 100m + memory: 30Mi + requests: + cpu: 100m + memory: 20Mi + terminationGracePeriodSeconds: 10 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nervex-server + namespace: nervex-system +spec: + selector: + control-plane: nervex-server + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 diff --git a/config/prometheus/kustomization.yaml b/config/prometheus/kustomization.yaml deleted file mode 100644 index ed13716..0000000 --- a/config/prometheus/kustomization.yaml +++ /dev/null @@ -1,2 +0,0 @@ -resources: -- monitor.yaml diff --git a/config/prometheus/monitor.yaml b/config/prometheus/monitor.yaml deleted file mode 100644 index 376e5af..0000000 --- a/config/prometheus/monitor.yaml +++ /dev/null @@ -1,16 +0,0 @@ - -# Prometheus Monitor Service (Metrics) -apiVersion: monitoring.coreos.com/v1 -kind: ServiceMonitor -metadata: - labels: - control-plane: controller-manager - name: controller-manager-metrics-monitor - namespace: nervex-system -spec: - endpoints: - - path: /metrics - port: https - selector: - matchLabels: - control-plane: controller-manager diff --git a/config/rbac/auth_proxy_service.yaml b/config/rbac/auth_proxy_service.yaml index 4ce22a0..5a44b80 100644 --- a/config/rbac/auth_proxy_service.yaml +++ b/config/rbac/auth_proxy_service.yaml @@ -2,13 +2,13 @@ apiVersion: v1 kind: Service metadata: labels: - control-plane: controller-manager - name: controller-manager-metrics-service + control-plane: nervex-operator + name: nervex-operator-metrics-service namespace: nervex-system spec: ports: - name: https port: 8443 - targetPort: https + targetPort: 8080 selector: - control-plane: controller-manager + control-plane: nervex-operator diff --git a/config/samples/nervex_v1alpha1_actorleanerconfig.yaml b/config/samples/nervex_v1alpha1_actorleanerconfig.yaml index 4a003c2..fc269e1 100644 --- a/config/samples/nervex_v1alpha1_actorleanerconfig.yaml +++ b/config/samples/nervex_v1alpha1_actorleanerconfig.yaml @@ -30,11 +30,9 @@ spec: requests: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 limits: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 volumeMounts: - name: config mountPath: /data/nervex @@ -71,11 +69,9 @@ spec: requests: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 limits: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 volumeMounts: - name: config mountPath: /data/nervex @@ -101,11 +97,9 @@ spec: requests: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 limits: cpu: 2 memory: 5Gi - nvidia.com/gpu: 1 env: - name: LC_ALL value: "en_US.utf-8" @@ -121,7 +115,7 @@ spec: cd /data/nervex/test_atari/test3; python3 -u -c 'import os; import nervex.entry.parallel_entry as pe; pe.launch_learner_aggregator(filename="atari_impala_default_config.py.pkl", name="aggregator{}".format(os.environ["HOSTNAME"].split("-")[-1]) )'; ports: - - name: aggregator-port + - name: aggt-port containerPort: 22272 volumeMounts: - name: config diff --git a/config/samples/nervex_v1alpha1_nervexjob.yaml b/config/samples/nervex_v1alpha1_nervexjob.yaml index 86a1b50..457b825 100644 --- a/config/samples/nervex_v1alpha1_nervexjob.yaml +++ b/config/samples/nervex_v1alpha1_nervexjob.yaml @@ -26,7 +26,8 @@ spec: python3 -u -c 'import nervex.entry.parallel_entry as pe; pe.launch_coordinator(filename="atari_impala_default_config.py.pkl")'; while true; do sleep 30; done; ports: - - containerPort: 22273 + - name: cood-port + containerPort: 22273 resources: requests: cpu: 2 diff --git a/controllers/pod.go b/controllers/pod.go index 27ae086..e1c618e 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -49,17 +49,21 @@ func (r *NervexJobReconciler) createPod(ctx context.Context, job *nervexv1alpha1 labels[nervexutil.ReplicaTypeLabel] = replicaType nervexutil.AddLabelsToPodTemplate(podTemplate, labels) - // add env - for i := range podTemplate.Spec.Containers { - if len(podTemplate.Spec.Containers[i].Env) == 0 { - podTemplate.Spec.Containers[i].Env = make([]corev1.EnvVar, 0) - } - podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{ - Name: "KUBERNETES_POD_NAMESPACE", - Value: podTemplate.Namespace, - }) + // get port + port, ok := nervexutil.GetPortFromPodTemplate(podTemplate, nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName) + if !ok { + port = nervexutil.DefaultCoordinatorPort + log.Info("no port found, use default port", "port", port) + nervexutil.SetPodTemplatePort(podTemplate, nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName, port) } + // add env + envs := make(map[string]string, 0) + envs["KUBERNETES_POD_NAMESPACE"] = podTemplate.Namespace + envs["KUBERNETES_POD_NAME"] = podTemplate.Name + envs["COORDINATOR_PORT"] = fmt.Sprintf("%d", port) + nervexutil.SetPodTemplateEnv(podTemplate, envs) + // set owner reference ownRefer := metav1.OwnerReference{ APIVersion: job.APIVersion, diff --git a/examples/mock-coordinator/main.go b/examples/mock-coordinator/main.go new file mode 100644 index 0000000..7905807 --- /dev/null +++ b/examples/mock-coordinator/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + +} diff --git a/server/dynamic/informer.go b/server/dynamic/informer.go index 16eafff..47b129c 100644 --- a/server/dynamic/informer.go +++ b/server/dynamic/informer.go @@ -1,6 +1,7 @@ package dynamic import ( + "log" "time" corev1 "k8s.io/api/core/v1" @@ -9,15 +10,14 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/controller-runtime/pkg/log" ) var ( resyncPeriod = 30 * time.Second ) -func NewDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource) cache.SharedIndexInformer { - dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, corev1.NamespaceAll, nil) +func NewDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, tweakListOptions dynamicinformer.TweakListOptionsFunc) cache.SharedIndexInformer { + dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, corev1.NamespaceAll, tweakListOptions) dynamicInformer := dif.ForResource(gvr).Informer() return dynamicInformer } @@ -27,7 +27,7 @@ func AddEventHandlers(s cache.SharedIndexInformer) { cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { // on add alconfig - log.Log.Info("new object", obj.(*unstructured.Unstructured).GetName()) + log.Printf("new object: %s/%s", obj.(*unstructured.Unstructured).GetNamespace(), obj.(*unstructured.Unstructured).GetName()) }, UpdateFunc: func(old, new interface{}) { // on update alconfig diff --git a/server/http/health.go b/server/http/health.go new file mode 100644 index 0000000..bf3958b --- /dev/null +++ b/server/http/health.go @@ -0,0 +1,22 @@ +package http + +import ( + "net/http" + "sync/atomic" +) + +// healthz is a liveness probe. +func healthz(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) +} + +// readyz is a readiness probe. +func readyz(isReady *atomic.Value) http.HandlerFunc { + return func(w http.ResponseWriter, _ *http.Request) { + if isReady == nil || !isReady.Load().(bool) { + http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + } +} diff --git a/server/http/server.go b/server/http/server.go index f61a8ad..3d5d0d9 100644 --- a/server/http/server.go +++ b/server/http/server.go @@ -35,10 +35,10 @@ type NervexJobRequest struct { } type ResourceQuantity struct { - Number int `json:"number"` - CPUs resource.Quantity `json:"cpus"` - GPUs resource.Quantity `json:"gpus"` - Memory resource.Quantity `json:"memory"` + Replicas int `json:"replicas"` + Cpu resource.Quantity `json:"cpus"` + Gpu resource.Quantity `json:"gpus"` + Memory resource.Quantity `json:"memory"` } type NervexJobResponse struct { @@ -70,6 +70,7 @@ func (s *NervexServer) Start() error { log := s.Log.WithName("NervexServer") http.HandleFunc("/api/v1alpha1/add", s.Add) http.HandleFunc("/api/v1alpha1/delete", s.Delete) + http.HandleFunc("/healthz", healthz) log.Info("Start listening on :8080") http.ListenAndServe(":8080", nil) @@ -77,7 +78,7 @@ func (s *NervexServer) Start() error { } func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { - log := s.Log.WithName("NernexServer") + log := s.Log.WithName("NervexServer") // parse request body var njreq NervexJobRequest @@ -91,7 +92,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { // get ALConfig obj, exists, err := s.alconfigDyInformer.GetIndexer().GetByKey(s.alconfig) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + errMsg := fmt.Sprintf("failed to get ALConfig: %s", err) + http.Error(w, errMsg, http.StatusInternalServerError) return } @@ -110,7 +112,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { // get coordinator coordinator, err := s.KubeClient.CoreV1().Pods(njreq.Namespace).Get(context.Background(), njreq.Coordinator, metav1.GetOptions{}) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + errMsg := fmt.Sprintf("failed to get coordinator: %s", err) + http.Error(w, errMsg, http.StatusInternalServerError) return } @@ -131,7 +134,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { njKey := fmt.Sprintf("%s/%s", njreq.Namespace, ownRefer.Name) obj, exists, err = s.njDyInformer.GetIndexer().GetByKey(njKey) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + errMsg := fmt.Sprintf("failed to get NervexJob: %s", err) + http.Error(w, errMsg, http.StatusInternalServerError) return } @@ -146,7 +150,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { // create actors and learners actors, learners, err := s.createActorsAndLearnersFromALConfig(alconfig, &njreq, ownRefer) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + errMsg := fmt.Sprintf("failed create actors and learners: %s", err) + http.Error(w, errMsg, http.StatusInternalServerError) return } @@ -160,7 +165,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) repJson, err := json.Marshal(rep) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + errMsg := fmt.Sprintf("failed to marshal json: %s", err) + http.Error(w, errMsg, http.StatusInternalServerError) return } w.Write(repJson) @@ -173,7 +179,7 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig( // create actors actorTemplate := alconfig.Spec.Actor.Template - actors, err := s.createPodsAndServices(&actorTemplate, ownRefer, njreq.Namespace, njreq.Coordinator, njreq.Actors.Number, + actors, err := s.createPodsAndServices(&actorTemplate, ownRefer, njreq, nervexutil.ActorName, nervexutil.DefaultActorContainerName, nervexutil.DefaultActorPortName, nervexutil.DefaultActorPort) if err != nil { @@ -182,7 +188,7 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig( // create learners learnerTemplate := alconfig.Spec.Learner.Template - learners, err := s.createPodsAndServices(&learnerTemplate, ownRefer, njreq.Namespace, njreq.Coordinator, njreq.Learners.Number, + learners, err := s.createPodsAndServices(&learnerTemplate, ownRefer, njreq, nervexutil.LearnerName, nervexutil.DefaultLearnerContainerName, nervexutil.DefaultLearnerPortName, nervexutil.DefaultLearnerPort) if err != nil { @@ -195,46 +201,58 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig( func (s *NervexServer) createPodsAndServices( template *corev1.PodTemplateSpec, ownRefer metav1.OwnerReference, - ns, coordinator string, - replicas int, + njreq *NervexJobRequest, replicaType, containerName, portName string, defaultPort int32) ([]string, error) { + log := s.Log.WithName("NervexServer") + coordinator := njreq.Coordinator + ns := njreq.Namespace + + portEnv := "" + resources := ResourceQuantity{} + if replicaType == nervexutil.ActorName { + resources = njreq.Actors + portEnv = "ACTOR_PORT" + } else if replicaType == nervexutil.LearnerName { + resources = njreq.Learners + portEnv = "LEARNER_PORT" + } + // add labels to pod template labels := nervexutil.GenLabels(ownRefer.Name) labels[nervexutil.ReplicaTypeLabel] = replicaType nervexutil.AddLabelsToPodTemplate(template, labels) - // generate pod from template - pod := nervexutil.BuildPodFromTemplate(template.DeepCopy()) - pod.GenerateName = fmt.Sprintf("%s-%s-", coordinator, replicaType) - pod.Namespace = ns - pod.SetOwnerReferences([]metav1.OwnerReference{ownRefer}) - + // setup pod template + template.GenerateName = fmt.Sprintf("%s-%s-", coordinator, replicaType) + template.SetOwnerReferences([]metav1.OwnerReference{ownRefer}) + template.Namespace = ns + // set pod resource + SetPodTemplateResources(template, resources, containerName) // get pod port - port, ok := nervexutil.GetPortFromPod(pod, containerName, portName) + port, ok := nervexutil.GetPortFromPodTemplate(template, containerName, portName) if !ok { port = defaultPort + log.Info("no port found, use default port", "port", port) + nervexutil.SetPodTemplatePort(template, containerName, portName, port) } + // add env + envs := make(map[string]string, 0) + envs[portEnv] = fmt.Sprintf("%d", port) + nervexutil.SetPodTemplateEnv(template, envs) + + // build pod + pod := nervexutil.BuildPodFromTemplate(template.DeepCopy()) + // build service - svc := corev1.Service{ - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: labels, - Ports: []corev1.ServicePort{ - { - Port: port, - Name: portName, - }, - }, - }, - } + svc := nervexutil.BuildService(labels, port, portName) svc.Labels = labels svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer}) results := []string{} // create pods and services - for i := 0; i < replicas; i++ { + for i := 0; i < resources.Replicas; i++ { tempPod := pod.DeepCopy() newPod, err := s.KubeClient.CoreV1().Pods(ns).Create(context.Background(), tempPod, metav1.CreateOptions{}) if err != nil { @@ -255,6 +273,32 @@ func (s *NervexServer) createPodsAndServices( return results, nil } +func SetPodTemplateResources(template *corev1.PodTemplateSpec, resources ResourceQuantity, containerName string) { + for i := range template.Spec.Containers { + if template.Spec.Containers[i].Name != containerName { + continue + } + if template.Spec.Containers[i].Resources.Limits == nil { + template.Spec.Containers[i].Resources.Limits = make(corev1.ResourceList, 0) + } + if template.Spec.Containers[i].Resources.Requests == nil { + template.Spec.Containers[i].Resources.Requests = make(corev1.ResourceList, 0) + } + + // cpu and memory must not be zero + if !resources.Cpu.IsZero() { + template.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.Cpu + template.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.Cpu + } + if !resources.Memory.IsZero() { + template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory + template.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory + } + template.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu + template.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu + } +} + func (s *NervexServer) Delete(w http.ResponseWriter, r *http.Request) { } diff --git a/server/main.go b/server/main.go index 141daf3..fa4bc60 100644 --- a/server/main.go +++ b/server/main.go @@ -13,7 +13,7 @@ import ( nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1" serverdynamic "go-sensephoenix.sensetime.com/nervex-operator/server/dynamic" - "go-sensephoenix.sensetime.com/nervex-operator/server/http" + serverhttp "go-sensephoenix.sensetime.com/nervex-operator/server/http" ) var ( @@ -43,7 +43,7 @@ func main() { Version: nervexv1alpha1.GroupVersion.Version, Resource: "actorlearnerconfigs", } - alconfigDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, alconfigGVR) + alconfigDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, alconfigGVR, nil) serverdynamic.AddEventHandlers(alconfigDyInformer) // add NervexJob informer @@ -52,7 +52,7 @@ func main() { Version: nervexv1alpha1.GroupVersion.Version, Resource: "nervexjobs", } - njDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, njGVR) + njDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, njGVR, nil) serverdynamic.AddEventHandlers(njDyInformer) // start dynamic informer @@ -68,7 +68,7 @@ func main() { logger := zap.New(zap.UseFlagOptions(&opts)) - nervexServer := http.NewNervexServer(kubeClient, dynamicClient, logger, alconfigDyInformer, njDyInformer, alconfigName) + nervexServer := serverhttp.NewNervexServer(kubeClient, dynamicClient, logger, alconfigDyInformer, njDyInformer, alconfigName) if err := nervexServer.Start(); err != nil { log.Fatalf("Failed to start NervexServer: %v", err) diff --git a/util/const.go b/util/const.go index e683def..422a04a 100644 --- a/util/const.go +++ b/util/const.go @@ -19,8 +19,8 @@ const ( DefaultActorPortName = "actor-port" DefaultLearnerPortName = "learner-port" - DefaultAggregatorPortName = "aggregator-port" - DefaultCoordinatorPortName = "coordinator-port" + DefaultAggregatorPortName = "aggt-port" + DefaultCoordinatorPortName = "cood-port" DefaultActorPort = 22270 DefaultLearnerPort = 22271 diff --git a/util/util.go b/util/util.go index 09388d5..3fec828 100644 --- a/util/util.go +++ b/util/util.go @@ -25,8 +25,8 @@ func GetObjectFromUnstructured(obj interface{}, dest interface{}) error { return nil } -func GetPortFromPod(pod *corev1.Pod, containerName, portName string) (int32, bool) { - for _, c := range pod.Spec.Containers { +func GetPortFromPodTemplate(template *corev1.PodTemplateSpec, containerName, portName string) (int32, bool) { + for _, c := range template.Spec.Containers { if c.Name != containerName { continue } @@ -39,6 +39,22 @@ func GetPortFromPod(pod *corev1.Pod, containerName, portName string) (int32, boo return -1, false } +func SetPodTemplatePort(template *corev1.PodTemplateSpec, containerName, portName string, port int32) { + for i := range template.Spec.Containers { + if template.Spec.Containers[i].Name != containerName { + continue + } + if template.Spec.Containers[i].Ports == nil { + template.Spec.Containers[i].Ports = []corev1.ContainerPort{} + } + portObj := corev1.ContainerPort{ + Name: portName, + ContainerPort: port, + } + template.Spec.Containers[i].Ports = append(template.Spec.Containers[i].Ports, portObj) + } +} + func GenLabels(jobName string) map[string]string { groupName := nervexv1alpha1.GroupVersion.Group return map[string]string{ @@ -68,3 +84,34 @@ func BuildPodFromTemplate(template *corev1.PodTemplateSpec) *corev1.Pod { pod.ObjectMeta = *template.ObjectMeta.DeepCopy() return pod } + +func SetPodTemplateEnv(template *corev1.PodTemplateSpec, envs map[string]string) { + // add env + for i := range template.Spec.Containers { + if len(template.Spec.Containers[i].Env) == 0 { + template.Spec.Containers[i].Env = make([]corev1.EnvVar, 0) + } + for k, v := range envs { + env := corev1.EnvVar{ + Name: k, + Value: v, + } + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, env) + } + } +} + +func BuildService(labels map[string]string, port int32, portName string) *corev1.Service { + return &corev1.Service{ + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: labels, + Ports: []corev1.ServicePort{ + { + Port: port, + Name: portName, + }, + }, + }, + } +} -- GitLab