提交 c059e218 编写于 作者: L liqingping

Merge branch 'feat/api-implement' into 'feat/crd-redefine'

feat: implement /v1alpha1/replicas GET POST DELETE api

See merge request platform/CloudNative4AI/cluster-lifecycle/nervex-operator!6
run:
# default concurrency is a available CPU number
concurrency: 4
# timeout for analysis, e.g. 30s, 5m, default is 1m
deadline: 10m
# exit code when at least one issue was found, default is 1
issues-exit-code: 1
# include test files or not, default is true
tests: true
skip-dirs:
- manifests # deploy phoenix-rubber yaml
- third_party # from go-ethereum
- _out #phoenix-rubber executable binary file
- doc # user tutorial
- deployment # deploy phoenix-rubber yaml
- config # the crd config yaml
- cluster # the logging bash
- vendor # the third library
- api # auto-generated
- pkg/client # auto-generated
- example
- bin
# output configuration options
output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
format: colored-line-number
# print lines of code with issue, default is true
print-issued-lines: true
# print linter name in the end of issue text, default is true
print-linter-name: true
linters:
fast: true
enable:
- gofmt
- goimports
- golint
- deadcode
disable:
- gocyclo
- typecheck
- bodyclose
- gochecknoinits
- gochecknoglobals
- gocyclo
- lll
- maligned
- unparam
- unused
- depguard
- dupl
- errcheck
- gas
- goconst
- gocritic
- gosec
- gosimple
- govet
- interfacer
- ineffassign
- megacheck
- misspell
- nakedret
- prealloc
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- varcheck
......@@ -30,7 +30,7 @@ spec:
- "--metrics-bind-address=:8080"
- "--leader-elect"
image: registry.sensetime.com/cloudnative4ai/nervex-operator:v0.0.1-alpha.0
imagePullPolicy: IfNotPresent
imagePullPolicy: Always
name: manager
securityContext:
allowPrivilegeEscalation: false
......
......@@ -24,7 +24,7 @@ spec:
- "--lease-lock-namespace=nervex-system"
- "--lease-lock-name=nervex-server"
image: registry.sensetime.com/cloudnative4ai/nervex-server:v0.0.1-alpha.0
imagePullPolicy: IfNotPresent
imagePullPolicy: Always
name: server
securityContext:
allowPrivilegeEscalation: false
......
......@@ -8,7 +8,7 @@ spec:
spec:
containers:
- name: aggregator
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.4
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.5
imagePullPolicy: IfNotPresent
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 5; python3 -u main.py aggregator -sl $HOSTNAME.default -sp $AGGREGATOR_PORT -sl $HOSTNAME.default -ml $HOSTNAME.default -mp 81"]
......
......@@ -10,8 +10,8 @@ spec:
spec:
containers:
- name: coordinator
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.4
imagePullPolicy: IfNotPresent
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["python3 -u main.py coordinator -l $HOSTNAME -p $COORDINATOR_PORT"]
# args: ["sleep 3600"]
......@@ -20,10 +20,10 @@ spec:
spec:
containers:
- name: collector
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.4
imagePullPolicy: IfNotPresent
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 5; python3 -u main.py actor -l $HOSTNAME.default -p $ACTOR_PORT"]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 10; python3 -u main.py collector -l $HOSTNAME.default -p $COLLECTOR_PORT"]
ports:
- name: collector
containerPort: 80
......@@ -32,10 +32,10 @@ spec:
spec:
containers:
- name: learner
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.4
imagePullPolicy: IfNotPresent
image: registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 5; python3 -u main.py learner -l $HOSTNAME.default -p $LEARNER_PORT"]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 10; python3 -u main.py learner -l $HOSTNAME.default -p $LEARNER_PORT"]
ports:
- name: learner
containerPort: 80
......@@ -18,13 +18,13 @@ var (
ResyncPeriod = 30 * time.Second
)
type DynamicInformers struct {
type Informers struct {
NJInformer informers.GenericInformer
AGInformer informers.GenericInformer
PodInformer informers.GenericInformer
}
func NewDynamicInformer(dif dynamicinformer.DynamicSharedInformerFactory) DynamicInformers {
func NewDynamicInformer(dif dynamicinformer.DynamicSharedInformerFactory) Informers {
// add ALConfig informer
aggregatorGVR := schema.GroupVersionResource{
Group: nervexv1alpha1.GroupVersion.Group,
......@@ -45,7 +45,7 @@ func NewDynamicInformer(dif dynamicinformer.DynamicSharedInformerFactory) Dynami
Version: corev1.SchemeGroupVersion.Version,
Resource: "pods",
}
dyi := DynamicInformers{
dyi := Informers{
NJInformer: dif.ForResource(njGVR),
AGInformer: dif.ForResource(aggregatorGVR),
PodInformer: dif.ForResource(podGVR),
......
此差异已折叠。
package k8s
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
"go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
func GetNerveXJob(dyi dynamic.Informers, namespace, coordinatorName string) (*nervexv1alpha1.NerveXJob, error) {
// get coordinator
coorKey := nervexutil.NamespacedName(namespace, coordinatorName)
coordinator, err := GetPodByKey(dyi, coorKey)
if err != nil {
return nil, err
}
var ownRefer metav1.OwnerReference
ownRefers := coordinator.GetOwnerReferences()
ownByNerveX := false
for _, ref := range ownRefers {
if ref.Kind == nervexv1alpha1.KindNerveXJob {
ownRefer = ref
ownByNerveX = true
}
}
if !ownByNerveX {
errMsg := fmt.Sprintf("coordinator %s is not owned by any NerveXJob", coordinatorName)
return nil, &servertypes.NerveXError{Type: servertypes.ErrorNotFound, Message: errMsg}
}
// get NerveXJob
njKey := nervexutil.NamespacedName(namespace, ownRefer.Name)
nvxJob, err := GetNerveXJobByKey(dyi, njKey)
if err != nil {
return nil, err
}
return nvxJob, nil
}
func GetNerveXJobByKey(dyi dynamic.Informers, key string) (*nervexv1alpha1.NerveXJob, error) {
obj, exists, err := dyi.NJInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get NerveXJob: %s", err)
return nil, fmt.Errorf(errMsg)
}
if !exists {
errMsg := fmt.Sprintf("NerveXJob: %s not exists in cache", key)
return nil, &servertypes.NerveXError{Type: servertypes.ErrorNotFound, Message: errMsg}
}
nvxUn := obj.(*unstructured.Unstructured)
var nvxJob nervexv1alpha1.NerveXJob
err = runtime.DefaultUnstructuredConverter.FromUnstructured(nvxUn.UnstructuredContent(), &nvxJob)
if err != nil {
errMsg := fmt.Sprintf("failed to convert unstructured: %s", nvxUn.UnstructuredContent())
return nil, fmt.Errorf(errMsg)
}
return &nvxJob, nil
}
func CreateCollectorsAndLearnersForNerveXJob(
kubeClient *kubernetes.Clientset,
njreq *servertypes.NerveXJobRequest,
job *nervexv1alpha1.NerveXJob) ([]string, []string, error) {
// build owner reference
ownRefer := metav1.OwnerReference{
APIVersion: job.APIVersion,
Kind: job.Kind,
Name: job.Name,
UID: job.GetUID(),
Controller: func(c bool) *bool { return &c }(true),
}
// create collectors
collectorTemplate := job.Spec.Collector.Template
collectors, err := CreatePodsAndServices(kubeClient, &collectorTemplate, ownRefer, njreq,
nervexutil.CollectorName, nervexutil.DefaultCollectorContainerName, nervexutil.DefaultCollectorPortName, nervexutil.DefaultCollectorPort)
if err != nil {
return collectors, nil, err
}
// create learners
learnerTemplate := job.Spec.Learner.Template
learners, err := CreatePodsAndServices(kubeClient, &learnerTemplate, ownRefer, njreq,
nervexutil.LearnerName, nervexutil.DefaultLearnerContainerName, nervexutil.DefaultLearnerPortName, nervexutil.DefaultLearnerPort)
if err != nil {
return collectors, learners, err
}
return collectors, learners, nil
}
package k8s
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
func GetPodByKey(dyi dynamic.Informers, key string) (*corev1.Pod, error) {
obj, exists, err := dyi.PodInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get pod: %s", err)
return nil, fmt.Errorf(errMsg)
}
if !exists {
errMsg := fmt.Sprintf("pod: %s not exists in cache", key)
return nil, &servertypes.NerveXError{Type: servertypes.ErrorNotFound, Message: errMsg}
}
podUn := obj.(*unstructured.Unstructured)
var pod corev1.Pod
err = runtime.DefaultUnstructuredConverter.FromUnstructured(podUn.UnstructuredContent(), &pod)
if err != nil {
errMsg := fmt.Sprintf("failed to convert unstructured: %s", podUn.UnstructuredContent())
return nil, fmt.Errorf(errMsg)
}
return &pod, nil
}
func CreatePodsAndServices(
kubeClient *kubernetes.Clientset,
template *corev1.PodTemplateSpec,
ownRefer metav1.OwnerReference,
njreq *servertypes.NerveXJobRequest,
replicaType, containerName, portName string, defaultPort int32) ([]string, error) {
ns := njreq.Namespace
resources := servertypes.ResourceQuantity{}
switch replicaType {
case nervexutil.CollectorName:
resources = njreq.Collectors
case nervexutil.LearnerName:
resources = njreq.Learners
}
results := []string{}
// create pods and services
for i := 0; i < resources.Replicas; i++ {
// build pod
pod, port, err := nervexutil.BuildPodFromTemplate(template.DeepCopy(), ownRefer, ns, replicaType, containerName, portName, defaultPort)
if err != nil {
return results, err
}
// set pod resources
SetPodResources(pod, resources, containerName)
// create pod
_, err = kubeClient.CoreV1().Pods(ns).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
}
return results, err
}
// build service
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
svc.Name = pod.Name
// create service
_, err = kubeClient.CoreV1().Services(ns).Create(context.Background(), svc, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
}
return results, err
}
result := nervexutil.ConcatURL(svc.Name, ns, port)
results = append(results, result)
}
return results, nil
}
func ListReplicaPodsWithSelector(dyi dynamic.Informers, ns string, labelSelector labels.Selector) (
collectors []*corev1.Pod, learners []*corev1.Pod, coordinator *corev1.Pod, aggregator *corev1.Pod, err error) {
// list pods that belong to the NerveXJob
pods, err := ListPodsWithSelector(dyi, ns, labelSelector)
if err != nil {
return
}
// filter out terminating pods since these pods are deleted
pods = FilterOutTerminatingPods(pods)
// classify pods
collectors, learners, coordinator, aggregator, err = nervexutil.ClassifyPods(pods)
if err != nil {
return
}
return
}
func ListPodsWithSelector(dyi dynamic.Informers, namespace string, labelSelector labels.Selector) ([]*corev1.Pod, error) {
ret, err := dyi.PodInformer.Lister().ByNamespace(namespace).List(labelSelector)
if err != nil {
return nil, err
}
pods := []*corev1.Pod{}
for _, obj := range ret {
podUn := obj.(*unstructured.Unstructured)
var pod corev1.Pod
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(podUn.UnstructuredContent(), &pod); err != nil {
return nil, err
}
pods = append(pods, &pod)
}
return pods, nil
}
func FilterOutTerminatingPods(pods []*corev1.Pod) []*corev1.Pod {
results := []*corev1.Pod{}
for _, pod := range pods {
if isTerminating(pod) {
continue
}
results = append(results, pod)
}
return results
}
func DeletePodsAndServices(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, njreq *servertypes.NerveXJobRequest, replicaType string) ([]string, error) {
results := []string{}
resources := servertypes.ResourceQuantity{}
var containerName, portName string
var defaultPort int32
ns := njreq.Namespace
switch replicaType {
case nervexutil.CollectorName:
resources = njreq.Collectors
containerName = nervexutil.DefaultCollectorContainerName
portName = nervexutil.DefaultCollectorPortName
defaultPort = nervexutil.DefaultCollectorPort
case nervexutil.LearnerName:
resources = njreq.Learners
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
}
for _, pod := range pods {
// break if enough
if len(results) >= resources.Replicas {
break
}
// delete pods
err := kubeClient.CoreV1().Pods(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
}
return results, err
}
// delete services
err = kubeClient.CoreV1().Services(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
}
return results, err
}
result := nervexutil.GetPodAccessURL(pod, ns, containerName, portName, defaultPort)
results = append(results, result)
}
return results, nil
}
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
}
package http
package k8s
import corev1 "k8s.io/api/core/v1"
import (
corev1 "k8s.io/api/core/v1"
func SetPodResources(template *corev1.Pod, resources ResourceQuantity, containerName string) {
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
)
func SetPodResources(template *corev1.Pod, resources servertypes.ResourceQuantity, containerName string) {
for i := range template.Spec.Containers {
if template.Spec.Containers[i].Name != containerName {
continue
......@@ -15,17 +19,17 @@ func SetPodResources(template *corev1.Pod, resources ResourceQuantity, container
}
// cpu and memory must not be zero
if !resources.Cpu.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.Cpu
template.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.Cpu
if !resources.CPU.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.CPU
template.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.CPU
}
if !resources.Memory.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory
template.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory
}
if !resources.Gpu.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu
template.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.Gpu
if !resources.GPU.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
template.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
}
}
}
package types
import (
"errors"
)
type NerveXError struct {
Type ErrorType `json:"type"`
Message string `json:"message"`
}
func (n *NerveXError) Error() string {
return n.Message
}
type ErrorType string
const (
// StatusCode = 500
ErrorUnknown ErrorType = "Unknown"
// StatusCode = 404
ErrorNotFound ErrorType = "NotFound"
// StatusCode = 409
ErrorAlreadyExists ErrorType = "AlreadyExists"
// StatusCode = 400
ErrorBadRequest ErrorType = "BadRequest"
)
func IsNotFound(err error) bool {
return TypeForError(err) == ErrorNotFound
}
func IsAlreadyExists(err error) bool {
return TypeForError(err) == ErrorAlreadyExists
}
func IsBadRequest(err error) bool {
return TypeForError(err) == ErrorBadRequest
}
func TypeForError(err error) ErrorType {
var nvxErr *NerveXError
if errors.As(err, &nvxErr) {
return nvxErr.Type
}
return ErrorUnknown
}
package http
package types
import (
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
serverdynamic "go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
)
type NerveXServer struct {
KubeClient *kubernetes.Clientset
DynamicClient dynamic.Interface
Log logr.Logger
dyi serverdynamic.DynamicInformers
type NerveXJobRequestParams struct {
Namespace []string `json:"namespace"`
Coordinator []string `json:"coordinator"`
Name []string `json:"name"`
}
const (
RequestParamTypeNamespace string = "namespace"
RequestParamTypeCoordinator string = "coordinator"
RequestParamTypeName string = "name"
)
type NerveXJobRequest struct {
Namespace string `json:"namespace"`
Coordinator string `json:"coordinator"`
......@@ -25,29 +25,33 @@ type NerveXJobRequest struct {
type ResourceQuantity struct {
Replicas int `json:"replicas"`
Cpu resource.Quantity `json:"cpus"`
Gpu resource.Quantity `json:"gpus"`
CPU resource.Quantity `json:"cpus"`
GPU resource.Quantity `json:"gpus"`
Memory resource.Quantity `json:"memory"`
}
type Response struct {
Success bool `json:"success"`
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
const (
CodeSuccess = iota
CodeFailed
)
type NerveXJobResponse struct {
Namespace string `json:"namespace"`
Coordinator string `json:"coordinator"`
Aggregator string `json:"aggregator"`
Collectors []string `json:"collectors"`
Learners []string `json:"learners"`
}
func NewNerveXServer(
kubeClient *kubernetes.Clientset,
dynamicClient dynamic.Interface,
log logr.Logger,
dyi serverdynamic.DynamicInformers) *NerveXServer {
return &NerveXServer{
KubeClient: kubeClient,
DynamicClient: dynamicClient,
Log: log,
dyi: dyi,
}
type ReplicaResponse struct {
Namespace string `json:"namespace"`
Coordinator string `json:"coordinator"`
ReplicaType string `json:"replicaType"`
Name string `json:"name"`
}
......@@ -109,7 +109,6 @@ func BuildPodFromTemplate(
podName = GenerateName(podName)
case AggregatorName:
portEnv = "AGGREGATOR_PORT"
case CoordinatorName:
portEnv = "COORDINATOR_PORT"
default:
......@@ -201,6 +200,14 @@ func ConcatURL(name, ns string, port int32) string {
return fmt.Sprintf("%s.%s:%d", name, ns, port)
}
func GetPodAccessURL(pod *corev1.Pod, namespace, containerName, portName string, defaultPort int32) string {
port, found := GetPortFromPod(pod, containerName, portName)
if !found {
port = defaultPort
}
return ConcatURL(pod.Name, namespace, port)
}
func ClassifyPods(pods []*corev1.Pod) (collectors []*corev1.Pod, learners []*corev1.Pod, coordinator *corev1.Pod, aggregator *corev1.Pod, err error) {
// filter out collectors
collectors, err = filterReplicaPods(pods, CollectorName)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册