提交 7de9c945 编写于 作者: L liqingping

feat: implement nervex-server add function

1. implement nervex-server add function
2. move common function to util
3. add dockerfile for
server
4. rm useless config
上级 63ba161d
......@@ -22,6 +22,5 @@ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532
ENTRYPOINT ["/manager"]
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0
WORKDIR /
COPY /bin/nervex-server .
ENTRYPOINT ["/nervex-server"]
......@@ -17,6 +17,7 @@ endif
# Image URL to use all building/pushing image targets
IMG ?= registry.sensetime.com/cloudnative4ai/nervex-operator:${VERSION}
SERVER_IMG ?= registry.sensetime.com/cloudnative4ai/nervex-server:${VERSION}
# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
CRD_OPTIONS ?= "crd:trivialVersions=true,preserveUnknownFields=false"
......@@ -52,6 +53,11 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust
./hack/update-image-tags.sh config/manager ${MASTER_VERSION}
./hack/update-version.sh ${MASTER_VERSION}
dev-manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
./hack/update-image-tags.sh config/manager ${VERSION}
./hack/update-version.sh ${VERSION}
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
......@@ -75,18 +81,21 @@ test: manifests generate fmt vet ## Run tests.
build: generate fmt vet ## Build manager binary.
go build -o bin/manager main.go
go build -o bin/nervex-server server/main.go
run: manifests generate fmt vet ## Run a controller from your host.
go run ./main.go
dev-images: build
docker build -t ${IMG} -f Dockerfile.dev .
docker build -t ${SERVER_IMG} -f Dockerfile.server .
docker-build: ## Build docker image with the manager.
docker build -t ${IMG} .
docker-push: ## Push docker image with the manager.
docker push ${IMG}
docker push ${SERVER_IMG}
##@ Deployment
......@@ -100,9 +109,15 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
dev-deploy: dev-manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -
dev-undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -
CONTROLLER_GEN = $(shell pwd)/bin/controller-gen
controller-gen: ## Download controller-gen locally if necessary.
......
# The following manifests contain a self-signed issuer CR and a certificate CR.
# More document can be found at https://docs.cert-manager.io
# WARNING: Targets CertManager v1.0. Check https://cert-manager.io/docs/installation/upgrading/ for breaking changes.
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: selfsigned-issuer
namespace: system
spec:
selfSigned: {}
---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: serving-cert # this name should match the one appeared in kustomizeconfig.yaml
namespace: system
spec:
# $(SERVICE_NAME) and $(SERVICE_NAMESPACE) will be substituted by kustomize
dnsNames:
- $(SERVICE_NAME).$(SERVICE_NAMESPACE).svc
- $(SERVICE_NAME).$(SERVICE_NAMESPACE).svc.cluster.local
issuerRef:
kind: Issuer
name: selfsigned-issuer
secretName: webhook-server-cert # this secret will not be prefixed, since it's not managed by kustomize
resources:
- certificate.yaml
configurations:
- kustomizeconfig.yaml
# This configuration is for teaching kustomize how to update name ref and var substitution
nameReference:
- kind: Issuer
group: cert-manager.io
fieldSpecs:
- kind: Certificate
group: cert-manager.io
path: spec/issuerRef/name
varReference:
- kind: Certificate
group: cert-manager.io
path: spec/commonName
- kind: Certificate
group: cert-manager.io
path: spec/dnsNames
......@@ -20,16 +20,16 @@ spec:
versions:
- additionalPrinterColumns:
- jsonPath: .status.Actors.Total
name: TotalActors
name: Total-Actors
type: integer
- jsonPath: .status.Actors.Active
name: ActiveActors
name: Active-Actors
type: integer
- jsonPath: .status.Learners.Total
name: TotalLearners
name: Total-Learners
type: integer
- jsonPath: .status.Learners.Active
name: ActiveLearners
name: Active-Learners
type: integer
name: v1alpha1
schema:
......@@ -6,7 +6,7 @@ namespace: nervex-system
# "wordpress" becomes "alices-wordpress".
# Note that it should also match with the prefix (text before '-') of the namespace
# field above.
namePrefix: nervex-
# namePrefix: nervex-
# Labels to add to all resources and selectors.
#commonLabels:
......
......@@ -3,22 +3,12 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
name: nervex-operator
namespace: nervex-system
spec:
template:
spec:
containers:
- name: kube-rbac-proxy
image: gcr.io/kubebuilder/kube-rbac-proxy:v0.8.0
args:
- "--secure-listen-address=0.0.0.0:8443"
- "--upstream=http://127.0.0.1:8080/"
- "--logtostderr=true"
- "--v=10"
ports:
- containerPort: 8443
name: https
- name: manager
args:
- "--health-probe-bind-address=:8081"
......
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
name: nervex-operator
namespace: nervex-system
spec:
template:
spec:
......
resources:
- manager.yaml
- priority_class.yaml
- nervex_server.yaml
generatorOptions:
disableNameSuffixHash: true
......@@ -14,4 +15,4 @@ kind: Kustomization
images:
- name: controller
newName: registry.sensetime.com/cloudnative4ai/nervex-operator
newTag: v0.0.1-alpha.0-962d399c
newTag: v0.0.1-alpha.0-63ba161d
......@@ -2,25 +2,25 @@ apiVersion: v1
kind: Namespace
metadata:
labels:
control-plane: controller-manager
control-plane: nervex-operator
name: nervex-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
name: nervex-operator
namespace: nervex-system
labels:
control-plane: controller-manager
control-plane: nervex-operator
spec:
selector:
matchLabels:
control-plane: controller-manager
control-plane: nervex-operator
replicas: 1
template:
metadata:
labels:
control-plane: controller-manager
control-plane: nervex-operator
spec:
containers:
- command:
......
apiVersion: apps/v1
kind: Deployment
metadata:
name: nervex-server
namespace: nervex-system
labels:
control-plane: nervex-server
spec:
selector:
matchLabels:
control-plane: nervex-server
replicas: 1
template:
metadata:
labels:
control-plane: nervex-server
spec:
containers:
- command:
- /nervex-server
image: registry.sensetime.com/cloudnative4ai/nervex-server:v0.0.1-alpha.0
imagePullPolicy: Always
name: server
securityContext:
allowPrivilegeEscalation: false
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
resources:
limits:
cpu: 100m
memory: 30Mi
requests:
cpu: 100m
memory: 20Mi
terminationGracePeriodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: nervex-server
namespace: nervex-system
spec:
selector:
control-plane: nervex-server
ports:
- protocol: TCP
port: 8080
targetPort: 8080
# Prometheus Monitor Service (Metrics)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
control-plane: controller-manager
name: controller-manager-metrics-monitor
namespace: nervex-system
spec:
endpoints:
- path: /metrics
port: https
selector:
matchLabels:
control-plane: controller-manager
......@@ -2,13 +2,13 @@ apiVersion: v1
kind: Service
metadata:
labels:
control-plane: controller-manager
name: controller-manager-metrics-service
control-plane: nervex-operator
name: nervex-operator-metrics-service
namespace: nervex-system
spec:
ports:
- name: https
port: 8443
targetPort: https
targetPort: 8080
selector:
control-plane: controller-manager
control-plane: nervex-operator
......@@ -30,11 +30,9 @@ spec:
requests:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
limits:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
volumeMounts:
- name: config
mountPath: /data/nervex
......@@ -71,11 +69,9 @@ spec:
requests:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
limits:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
volumeMounts:
- name: config
mountPath: /data/nervex
......@@ -101,11 +97,9 @@ spec:
requests:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
limits:
cpu: 2
memory: 5Gi
nvidia.com/gpu: 1
env:
- name: LC_ALL
value: "en_US.utf-8"
......@@ -121,7 +115,7 @@ spec:
cd /data/nervex/test_atari/test3;
python3 -u -c 'import os; import nervex.entry.parallel_entry as pe; pe.launch_learner_aggregator(filename="atari_impala_default_config.py.pkl", name="aggregator{}".format(os.environ["HOSTNAME"].split("-")[-1]) )';
ports:
- name: aggregator-port
- name: aggt-port
containerPort: 22272
volumeMounts:
- name: config
......
......@@ -26,7 +26,8 @@ spec:
python3 -u -c 'import nervex.entry.parallel_entry as pe; pe.launch_coordinator(filename="atari_impala_default_config.py.pkl")';
while true; do sleep 30; done;
ports:
- containerPort: 22273
- name: cood-port
containerPort: 22273
resources:
requests:
cpu: 2
......
......@@ -49,17 +49,21 @@ func (r *NervexJobReconciler) createPod(ctx context.Context, job *nervexv1alpha1
labels[nervexutil.ReplicaTypeLabel] = replicaType
nervexutil.AddLabelsToPodTemplate(podTemplate, labels)
// add env
for i := range podTemplate.Spec.Containers {
if len(podTemplate.Spec.Containers[i].Env) == 0 {
podTemplate.Spec.Containers[i].Env = make([]corev1.EnvVar, 0)
}
podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{
Name: "KUBERNETES_POD_NAMESPACE",
Value: podTemplate.Namespace,
})
// get port
port, ok := nervexutil.GetPortFromPodTemplate(podTemplate, nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName)
if !ok {
port = nervexutil.DefaultCoordinatorPort
log.Info("no port found, use default port", "port", port)
nervexutil.SetPodTemplatePort(podTemplate, nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName, port)
}
// add env
envs := make(map[string]string, 0)
envs["KUBERNETES_POD_NAMESPACE"] = podTemplate.Namespace
envs["KUBERNETES_POD_NAME"] = podTemplate.Name
envs["COORDINATOR_PORT"] = fmt.Sprintf("%d", port)
nervexutil.SetPodTemplateEnv(podTemplate, envs)
// set owner reference
ownRefer := metav1.OwnerReference{
APIVersion: job.APIVersion,
......
package main
func main() {
}
package dynamic
import (
"log"
"time"
corev1 "k8s.io/api/core/v1"
......@@ -9,15 +10,14 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/log"
)
var (
resyncPeriod = 30 * time.Second
)
func NewDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource) cache.SharedIndexInformer {
dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, corev1.NamespaceAll, nil)
func NewDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, tweakListOptions dynamicinformer.TweakListOptionsFunc) cache.SharedIndexInformer {
dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, corev1.NamespaceAll, tweakListOptions)
dynamicInformer := dif.ForResource(gvr).Informer()
return dynamicInformer
}
......@@ -27,7 +27,7 @@ func AddEventHandlers(s cache.SharedIndexInformer) {
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// on add alconfig
log.Log.Info("new object", obj.(*unstructured.Unstructured).GetName())
log.Printf("new object: %s/%s", obj.(*unstructured.Unstructured).GetNamespace(), obj.(*unstructured.Unstructured).GetName())
},
UpdateFunc: func(old, new interface{}) {
// on update alconfig
......
package http
import (
"net/http"
"sync/atomic"
)
// healthz is a liveness probe.
func healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}
// readyz is a readiness probe.
func readyz(isReady *atomic.Value) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
if isReady == nil || !isReady.Load().(bool) {
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
}
......@@ -35,10 +35,10 @@ type NervexJobRequest struct {
}
type ResourceQuantity struct {
Number int `json:"number"`
CPUs resource.Quantity `json:"cpus"`
GPUs resource.Quantity `json:"gpus"`
Memory resource.Quantity `json:"memory"`
Replicas int `json:"replicas"`
Cpu resource.Quantity `json:"cpus"`
Gpu resource.Quantity `json:"gpus"`
Memory resource.Quantity `json:"memory"`
}
type NervexJobResponse struct {
......@@ -70,6 +70,7 @@ func (s *NervexServer) Start() error {
log := s.Log.WithName("NervexServer")
http.HandleFunc("/api/v1alpha1/add", s.Add)
http.HandleFunc("/api/v1alpha1/delete", s.Delete)
http.HandleFunc("/healthz", healthz)
log.Info("Start listening on :8080")
http.ListenAndServe(":8080", nil)
......@@ -77,7 +78,7 @@ func (s *NervexServer) Start() error {
}
func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
log := s.Log.WithName("NernexServer")
log := s.Log.WithName("NervexServer")
// parse request body
var njreq NervexJobRequest
......@@ -91,7 +92,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
// get ALConfig
obj, exists, err := s.alconfigDyInformer.GetIndexer().GetByKey(s.alconfig)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
errMsg := fmt.Sprintf("failed to get ALConfig: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
......@@ -110,7 +112,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
// get coordinator
coordinator, err := s.KubeClient.CoreV1().Pods(njreq.Namespace).Get(context.Background(), njreq.Coordinator, metav1.GetOptions{})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
errMsg := fmt.Sprintf("failed to get coordinator: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
......@@ -131,7 +134,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
njKey := fmt.Sprintf("%s/%s", njreq.Namespace, ownRefer.Name)
obj, exists, err = s.njDyInformer.GetIndexer().GetByKey(njKey)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
errMsg := fmt.Sprintf("failed to get NervexJob: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
......@@ -146,7 +150,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
// create actors and learners
actors, learners, err := s.createActorsAndLearnersFromALConfig(alconfig, &njreq, ownRefer)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
errMsg := fmt.Sprintf("failed create actors and learners: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
......@@ -160,7 +165,8 @@ func (s *NervexServer) Add(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
repJson, err := json.Marshal(rep)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
errMsg := fmt.Sprintf("failed to marshal json: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
w.Write(repJson)
......@@ -173,7 +179,7 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig(
// create actors
actorTemplate := alconfig.Spec.Actor.Template
actors, err := s.createPodsAndServices(&actorTemplate, ownRefer, njreq.Namespace, njreq.Coordinator, njreq.Actors.Number,
actors, err := s.createPodsAndServices(&actorTemplate, ownRefer, njreq,
nervexutil.ActorName, nervexutil.DefaultActorContainerName, nervexutil.DefaultActorPortName, nervexutil.DefaultActorPort)
if err != nil {
......@@ -182,7 +188,7 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig(
// create learners
learnerTemplate := alconfig.Spec.Learner.Template
learners, err := s.createPodsAndServices(&learnerTemplate, ownRefer, njreq.Namespace, njreq.Coordinator, njreq.Learners.Number,
learners, err := s.createPodsAndServices(&learnerTemplate, ownRefer, njreq,
nervexutil.LearnerName, nervexutil.DefaultLearnerContainerName, nervexutil.DefaultLearnerPortName, nervexutil.DefaultLearnerPort)
if err != nil {
......@@ -195,46 +201,58 @@ func (s *NervexServer) createActorsAndLearnersFromALConfig(
func (s *NervexServer) createPodsAndServices(
template *corev1.PodTemplateSpec,
ownRefer metav1.OwnerReference,
ns, coordinator string,
replicas int,
njreq *NervexJobRequest,
replicaType, containerName, portName string, defaultPort int32) ([]string, error) {
log := s.Log.WithName("NervexServer")
coordinator := njreq.Coordinator
ns := njreq.Namespace
portEnv := ""
resources := ResourceQuantity{}
if replicaType == nervexutil.ActorName {
resources = njreq.Actors
portEnv = "ACTOR_PORT"
} else if replicaType == nervexutil.LearnerName {
resources = njreq.Learners
portEnv = "LEARNER_PORT"
}
// add labels to pod template
labels := nervexutil.GenLabels(ownRefer.Name)
labels[nervexutil.ReplicaTypeLabel] = replicaType
nervexutil.AddLabelsToPodTemplate(template, labels)
// generate pod from template
pod := nervexutil.BuildPodFromTemplate(template.DeepCopy())
pod.GenerateName = fmt.Sprintf("%s-%s-", coordinator, replicaType)
pod.Namespace = ns
pod.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
// setup pod template
template.GenerateName = fmt.Sprintf("%s-%s-", coordinator, replicaType)
template.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
template.Namespace = ns
// set pod resource
SetPodTemplateResources(template, resources, containerName)
// get pod port
port, ok := nervexutil.GetPortFromPod(pod, containerName, portName)
port, ok := nervexutil.GetPortFromPodTemplate(template, containerName, portName)
if !ok {
port = defaultPort
log.Info("no port found, use default port", "port", port)
nervexutil.SetPodTemplatePort(template, containerName, portName, port)
}
// add env
envs := make(map[string]string, 0)
envs[portEnv] = fmt.Sprintf("%d", port)
nervexutil.SetPodTemplateEnv(template, envs)
// build pod
pod := nervexutil.BuildPodFromTemplate(template.DeepCopy())
// build service
svc := corev1.Service{
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Ports: []corev1.ServicePort{
{
Port: port,
Name: portName,
},
},
},
}
svc := nervexutil.BuildService(labels, port, portName)
svc.Labels = labels
svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
results := []string{}
// create pods and services
for i := 0; i < replicas; i++ {
for i := 0; i < resources.Replicas; i++ {
tempPod := pod.DeepCopy()
newPod, err := s.KubeClient.CoreV1().Pods(ns).Create(context.Background(), tempPod, metav1.CreateOptions{})
if err != nil {
......@@ -255,6 +273,32 @@ func (s *NervexServer) createPodsAndServices(
return results, nil
}
func SetPodTemplateResources(template *corev1.PodTemplateSpec, resources ResourceQuantity, containerName string) {
for i := range template.Spec.Containers {
if template.Spec.Containers[i].Name != containerName {
continue
}
if template.Spec.Containers[i].Resources.Limits == nil {
template.Spec.Containers[i].Resources.Limits = make(corev1.ResourceList, 0)
}
if template.Spec.Containers[i].Resources.Requests == nil {
template.Spec.Containers[i].Resources.Requests = make(corev1.ResourceList, 0)
}
// cpu and memory must not be zero
if !resources.Cpu.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.Cpu
template.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.Cpu
}
if !resources.Memory.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory
template.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory
}
template.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu
template.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu
}
}
func (s *NervexServer) Delete(w http.ResponseWriter, r *http.Request) {
}
......@@ -13,7 +13,7 @@ import (
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
serverdynamic "go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
"go-sensephoenix.sensetime.com/nervex-operator/server/http"
serverhttp "go-sensephoenix.sensetime.com/nervex-operator/server/http"
)
var (
......@@ -43,7 +43,7 @@ func main() {
Version: nervexv1alpha1.GroupVersion.Version,
Resource: "actorlearnerconfigs",
}
alconfigDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, alconfigGVR)
alconfigDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, alconfigGVR, nil)
serverdynamic.AddEventHandlers(alconfigDyInformer)
// add NervexJob informer
......@@ -52,7 +52,7 @@ func main() {
Version: nervexv1alpha1.GroupVersion.Version,
Resource: "nervexjobs",
}
njDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, njGVR)
njDyInformer := serverdynamic.NewDynamicInformer(dynamicClient, njGVR, nil)
serverdynamic.AddEventHandlers(njDyInformer)
// start dynamic informer
......@@ -68,7 +68,7 @@ func main() {
logger := zap.New(zap.UseFlagOptions(&opts))
nervexServer := http.NewNervexServer(kubeClient, dynamicClient, logger, alconfigDyInformer, njDyInformer, alconfigName)
nervexServer := serverhttp.NewNervexServer(kubeClient, dynamicClient, logger, alconfigDyInformer, njDyInformer, alconfigName)
if err := nervexServer.Start(); err != nil {
log.Fatalf("Failed to start NervexServer: %v", err)
......
......@@ -19,8 +19,8 @@ const (
DefaultActorPortName = "actor-port"
DefaultLearnerPortName = "learner-port"
DefaultAggregatorPortName = "aggregator-port"
DefaultCoordinatorPortName = "coordinator-port"
DefaultAggregatorPortName = "aggt-port"
DefaultCoordinatorPortName = "cood-port"
DefaultActorPort = 22270
DefaultLearnerPort = 22271
......
......@@ -25,8 +25,8 @@ func GetObjectFromUnstructured(obj interface{}, dest interface{}) error {
return nil
}
func GetPortFromPod(pod *corev1.Pod, containerName, portName string) (int32, bool) {
for _, c := range pod.Spec.Containers {
func GetPortFromPodTemplate(template *corev1.PodTemplateSpec, containerName, portName string) (int32, bool) {
for _, c := range template.Spec.Containers {
if c.Name != containerName {
continue
}
......@@ -39,6 +39,22 @@ func GetPortFromPod(pod *corev1.Pod, containerName, portName string) (int32, boo
return -1, false
}
func SetPodTemplatePort(template *corev1.PodTemplateSpec, containerName, portName string, port int32) {
for i := range template.Spec.Containers {
if template.Spec.Containers[i].Name != containerName {
continue
}
if template.Spec.Containers[i].Ports == nil {
template.Spec.Containers[i].Ports = []corev1.ContainerPort{}
}
portObj := corev1.ContainerPort{
Name: portName,
ContainerPort: port,
}
template.Spec.Containers[i].Ports = append(template.Spec.Containers[i].Ports, portObj)
}
}
func GenLabels(jobName string) map[string]string {
groupName := nervexv1alpha1.GroupVersion.Group
return map[string]string{
......@@ -68,3 +84,34 @@ func BuildPodFromTemplate(template *corev1.PodTemplateSpec) *corev1.Pod {
pod.ObjectMeta = *template.ObjectMeta.DeepCopy()
return pod
}
func SetPodTemplateEnv(template *corev1.PodTemplateSpec, envs map[string]string) {
// add env
for i := range template.Spec.Containers {
if len(template.Spec.Containers[i].Env) == 0 {
template.Spec.Containers[i].Env = make([]corev1.EnvVar, 0)
}
for k, v := range envs {
env := corev1.EnvVar{
Name: k,
Value: v,
}
template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, env)
}
}
}
func BuildService(labels map[string]string, port int32, portName string) *corev1.Service {
return &corev1.Service{
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Ports: []corev1.ServicePort{
{
Port: port,
Name: portName,
},
},
},
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册