提交 dd056253 编写于 作者: L liqingping

feat: add ReplicasFailed api

上级 4df3471e
......@@ -76,37 +76,12 @@ func (s *NerveXServer) Replicas(w http.ResponseWriter, r *http.Request) {
case "DELETE":
msg = "successfully delete replicas"
reps, err = s.deleteReplicas(r)
default:
err = &servertypes.NerveXError{Type: servertypes.ErrorNotImplemented, Message: fmt.Sprintf("%s not implemented", r.Method)}
log.Error(err, "method not implemented")
}
var success bool = true
var code int = servertypes.CodeSuccess
var statusCode int = http.StatusOK
if err != nil {
success = false
code = servertypes.CodeFailed
msg = err.Error()
// define status code
if servertypes.IsNotFound(err) {
statusCode = http.StatusNotFound
} else if servertypes.IsAlreadyExists(err) {
statusCode = http.StatusConflict
} else if servertypes.IsBadRequest(err) {
statusCode = http.StatusBadRequest
} else {
statusCode = http.StatusInternalServerError
}
log.Error(err, "failed to process request")
}
// build response
rep := servertypes.Response{
Success: success,
Code: code,
Message: msg,
Data: reps,
}
rep, statusCode := s.buildResponse(reps, msg, err)
// write response
if err = writeResponse(w, rep, statusCode); err != nil {
......@@ -265,30 +240,6 @@ func (s *NerveXServer) getNamespacedReplicasByCoordinatorAndName(namespace, coor
}
func (s *NerveXServer) ReplicasFailed(w http.ResponseWriter, r *http.Request) {
log := s.Log.WithName("NerveXServer")
// parse request body
var njreq servertypes.NerveXJobResponse
err := json.NewDecoder(r.Body).Decode(&njreq)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Info("delete request body: ", "request", njreq)
// rep, err := s.deleteReplicas(njreq)
// if err != nil {
// log.Error(err, "failed to delete replicas")
// http.Error(w, err.Error(), http.StatusInternalServerError)
// }
// // write response
// if err = writeResponse(w, rep); err != nil {
// log.Error(err, "failed to write response")
// }
}
// add replicas api
func (s *NerveXServer) addReplicas(r *http.Request) (servertypes.NerveXJobResponse, error) {
log := s.Log.WithName("NerveXServer")
......@@ -353,13 +304,13 @@ func (s *NerveXServer) deleteReplicas(r *http.Request) (servertypes.NerveXJobRes
}
// delete collector pods
delCollectors, err := serverk8s.DeleteReplicas(s.KubeClient, collectors, &njreq, nervexutil.CollectorName)
delCollectors, err := serverk8s.DeleteReplicas(s.KubeClient, collectors, njreq.Namespace, njreq.Collectors.Replicas, nervexutil.CollectorName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
// delete learner pods
delLearners, err := serverk8s.DeleteReplicas(s.KubeClient, learners, &njreq, nervexutil.LearnerName)
delLearners, err := serverk8s.DeleteReplicas(s.KubeClient, learners, njreq.Namespace, njreq.Learners.Replicas, nervexutil.LearnerName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
......@@ -376,6 +327,98 @@ func (s *NerveXServer) deleteReplicas(r *http.Request) (servertypes.NerveXJobRes
return rep, nil
}
// ReplicasFailed will delete the failed replicas reported by caller, and recreate the same number of replicas
func (s *NerveXServer) ReplicasFailed(w http.ResponseWriter, r *http.Request) {
log := s.Log.WithName("NerveXServer")
var reps interface{}
var err error
var msg string
switch r.Method {
case "POST":
msg = "successfully recreate replicas"
reps, err = s.replicasFailed(r)
default:
err = &servertypes.NerveXError{Type: servertypes.ErrorNotImplemented, Message: fmt.Sprintf("%s not implemented", r.Method)}
log.Error(err, "method not implemented")
}
rep, statusCode := s.buildResponse(reps, msg, err)
// write response
if err = writeResponse(w, rep, statusCode); err != nil {
log.Error(err, "failed to write response")
}
}
func (s *NerveXServer) replicasFailed(r *http.Request) (servertypes.NerveXJobResponse, error) {
log := s.Log.WithName("NerveXServer")
// parse request body
var njreq servertypes.NerveXJobResponse
err := json.NewDecoder(r.Body).Decode(&njreq)
if err != nil {
errMsg := fmt.Sprintf("failed to decode request body: %v", err)
return servertypes.NerveXJobResponse{}, &servertypes.NerveXError{Type: servertypes.ErrorBadRequest, Message: errMsg}
}
log.Info("delete request body: ", "request", njreq)
cpods, err := serverk8s.GetPodsByNames(s.dyi, njreq.Namespace, njreq.Collectors)
collectors, err := serverk8s.RecreateReplicas(s.KubeClient, cpods, njreq.Namespace, nervexutil.CollectorName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
lpods, err := serverk8s.GetPodsByNames(s.dyi, njreq.Namespace, njreq.Learners)
learners, err := serverk8s.RecreateReplicas(s.KubeClient, lpods, njreq.Namespace, nervexutil.LearnerName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
rep := servertypes.NerveXJobResponse{
Namespace: njreq.Namespace,
Coordinator: njreq.Coordinator,
Collectors: collectors,
Learners: learners,
}
return rep, nil
}
func (s *NerveXServer) buildResponse(reps interface{}, msg string, err error) (servertypes.Response, int) {
log := s.Log.WithName("NerveXServer")
var success bool = true
var code int = servertypes.CodeSuccess
var statusCode int = http.StatusOK
if err != nil {
success = false
code = servertypes.CodeFailed
msg = err.Error()
// define status code
if servertypes.IsNotFound(err) {
statusCode = http.StatusNotFound
} else if servertypes.IsAlreadyExists(err) {
statusCode = http.StatusConflict
} else if servertypes.IsBadRequest(err) {
statusCode = http.StatusBadRequest
} else if servertypes.IsNotImplemented(err) {
statusCode = http.StatusNotImplemented
} else {
statusCode = http.StatusInternalServerError
}
log.Error(err, "failed to process request")
}
// build response
rep := servertypes.Response{
Success: success,
Code: code,
Message: msg,
Data: reps,
}
return rep, statusCode
}
func writeResponse(w http.ResponseWriter, rep servertypes.Response, statusCode int) error {
w.Header().Set("Conten-Type", "application/json")
w.WriteHeader(statusCode)
......
......@@ -2,7 +2,9 @@ package k8s
import (
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
......@@ -84,8 +86,7 @@ func CreateCollectorsAndLearnersForNerveXJob(
// create collectors
collectorTemplate := job.Spec.Collector.Template
collectors, err := CreatePodsAndServices(kubeClient, &collectorTemplate, ownRefer, njreq,
nervexutil.CollectorName, nervexutil.DefaultCollectorContainerName, nervexutil.DefaultCollectorPortName, nervexutil.DefaultCollectorPort)
collectors, err := CreateReplicas(kubeClient, &collectorTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.CollectorName)
if err != nil {
return collectors, nil, err
......@@ -93,8 +94,7 @@ func CreateCollectorsAndLearnersForNerveXJob(
// create learners
learnerTemplate := job.Spec.Learner.Template
learners, err := CreatePodsAndServices(kubeClient, &learnerTemplate, ownRefer, njreq,
nervexutil.LearnerName, nervexutil.DefaultLearnerContainerName, nervexutil.DefaultLearnerPortName, nervexutil.DefaultLearnerPort)
learners, err := CreateReplicas(kubeClient, &learnerTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.LearnerName)
if err != nil {
return collectors, learners, err
......@@ -102,3 +102,148 @@ func CreateCollectorsAndLearnersForNerveXJob(
return collectors, learners, nil
}
func CreateReplicas(
kubeClient *kubernetes.Clientset,
template *corev1.PodTemplateSpec,
ownRefer metav1.OwnerReference,
resources servertypes.ResourceQuantity,
namespace string,
replicaType string) ([]string, error) {
var containerName, portName string
var defaultPort int32
switch replicaType {
case nervexutil.CollectorName:
containerName = nervexutil.DefaultCollectorContainerName
portName = nervexutil.DefaultCollectorPortName
defaultPort = nervexutil.DefaultCollectorPort
case nervexutil.LearnerName:
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
}
results := []string{}
// create pods and services
for i := 0; i < resources.Replicas; i++ {
// build pod
pod, port, err := nervexutil.BuildPodFromTemplate(template.DeepCopy(), ownRefer, namespace, replicaType, containerName, portName, defaultPort)
if err != nil {
return results, err
}
// set pod resources
SetPodResources(pod, resources, containerName)
// build service
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
svc.Name = pod.Name
// create pod
if err := CreatePodAndService(kubeClient, namespace, pod, svc); err != nil {
return results, err
}
result := nervexutil.ConcatURL(svc.Name, namespace, port)
results = append(results, result)
}
return results, nil
}
func DeleteReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namespace string, replicas int, replicaType string) ([]string, error) {
var containerName, portName string
var defaultPort int32
switch replicaType {
case nervexutil.CollectorName:
containerName = nervexutil.DefaultCollectorContainerName
portName = nervexutil.DefaultCollectorPortName
defaultPort = nervexutil.DefaultCollectorPort
case nervexutil.LearnerName:
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
}
results := []string{}
for _, pod := range pods {
// break if enough
if len(results) >= replicas {
break
}
// delete pods and services
if err := DeletePodAndService(kubeClient, namespace, pod.Name); err != nil {
return results, err
}
result := nervexutil.GetPodAccessURL(pod, namespace, containerName, portName, defaultPort)
results = append(results, result)
}
return results, nil
}
func RecreateReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namespace, replicaType string) ([]string, error) {
var containerName, portName string
var defaultPort int32
switch replicaType {
case nervexutil.CollectorName:
containerName = nervexutil.DefaultCollectorContainerName
portName = nervexutil.DefaultCollectorPortName
defaultPort = nervexutil.DefaultCollectorPort
case nervexutil.LearnerName:
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
}
// delete pods and services
for _, pod := range pods {
if err := DeletePodAndService(kubeClient, namespace, pod.Name); err != nil {
return nil, err
}
}
// create pods and services
var results []string
for _, oldPod := range pods {
var pod *corev1.Pod = &corev1.Pod{}
parts := strings.Split(oldPod.Name, "-")
generateName := strings.Join(parts[:len(parts)-1], "-")
name := nervexutil.GenerateName(generateName)
pod.SetName(name)
pod.SetOwnerReferences(oldPod.GetOwnerReferences())
pod.Spec = oldPod.DeepCopy().Spec
labels := oldPod.GetLabels()
labels[nervexutil.PodNameLabel] = name
nervexutil.AddLabelsToPod(pod, labels)
// build service
port, ok := nervexutil.GetPortFromPod(pod, containerName, portName)
if !ok {
port = defaultPort
}
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
svc.SetOwnerReferences(pod.GetOwnerReferences())
svc.Name = pod.Name
if err := CreatePodAndService(kubeClient, namespace, pod, svc); err != nil {
return results, err
}
result := nervexutil.ConcatURL(svc.Name, namespace, port)
results = append(results, result)
}
return results, nil
}
......@@ -6,6 +6,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
......@@ -17,6 +18,33 @@ import (
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
func GetPodsByNames(dyi dynamic.Informers, namespace string, names []string) ([]*corev1.Pod, error) {
var keys []string
var pods []*corev1.Pod
for _, name := range names {
key := nervexutil.NamespacedName(namespace, name)
keys = append(keys, key)
}
pods, err := GetPodsByKeys(dyi, keys)
if err != nil {
return pods, err
}
return pods, nil
}
func GetPodsByKeys(dyi dynamic.Informers, keys []string) ([]*corev1.Pod, error) {
var pods []*corev1.Pod
for _, key := range keys {
pod, err := GetPodByKey(dyi, key)
if err != nil {
return pods, err
}
pods = append(pods, pod)
}
return pods, nil
}
func GetPodByKey(dyi dynamic.Informers, key string) (*corev1.Pod, error) {
obj, exists, err := dyi.PodInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
......@@ -39,68 +67,98 @@ func GetPodByKey(dyi dynamic.Informers, key string) (*corev1.Pod, error) {
return &pod, nil
}
func CreatePodsAndServices(
kubeClient *kubernetes.Clientset,
template *corev1.PodTemplateSpec,
ownRefer metav1.OwnerReference,
njreq *servertypes.NerveXJobRequest,
replicaType, containerName, portName string, defaultPort int32) ([]string, error) {
ns := njreq.Namespace
resources := servertypes.ResourceQuantity{}
switch replicaType {
case nervexutil.CollectorName:
resources = njreq.Collectors
case nervexutil.LearnerName:
resources = njreq.Learners
}
results := []string{}
// create pods and services
for i := 0; i < resources.Replicas; i++ {
// build pod
pod, port, err := nervexutil.BuildPodFromTemplate(template.DeepCopy(), ownRefer, ns, replicaType, containerName, portName, defaultPort)
if err != nil {
return results, err
func CreatePodAndService(kubeClient *kubernetes.Clientset, namespace string, pod *corev1.Pod, service *corev1.Service) error {
// create pod
_, err := kubeClient.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return &servertypes.NerveXError{Type: servertypes.ErrorAlreadyExists, Message: err.Error()}
}
// set pod resources
SetPodResources(pod, resources, containerName)
// create pod
_, err = kubeClient.CoreV1().Pods(ns).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
// continue if pod already exists
if k8serrors.IsAlreadyExists(err) {
continue
}
return results, err
return err
}
// create service
_, err = kubeClient.CoreV1().Services(namespace).Create(context.Background(), service, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return &servertypes.NerveXError{Type: servertypes.ErrorAlreadyExists, Message: err.Error()}
}
return err
}
return nil
}
// build service
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
svc.Name = pod.Name
func DeletePodAndService(kubeClient *kubernetes.Clientset, namespace, name string) error {
// delete pods
if err := kubeClient.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
return err
}
// create service
_, err = kubeClient.CoreV1().Services(ns).Create(context.Background(), svc, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
continue
}
return results, err
// delete services
if err := kubeClient.CoreV1().Services(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
return err
}
return nil
}
func SetPodResources(pod *corev1.Pod, resources servertypes.ResourceQuantity, containerName string) {
for i := range pod.Spec.Containers {
if pod.Spec.Containers[i].Name != containerName {
continue
}
if pod.Spec.Containers[i].Resources.Limits == nil {
pod.Spec.Containers[i].Resources.Limits = make(corev1.ResourceList)
}
if pod.Spec.Containers[i].Resources.Requests == nil {
pod.Spec.Containers[i].Resources.Requests = make(corev1.ResourceList)
}
result := nervexutil.ConcatURL(svc.Name, ns, port)
results = append(results, result)
// cpu and memory must not be zero
if !resources.CPU.IsZero() {
pod.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.CPU
pod.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.CPU
}
if !resources.Memory.IsZero() {
pod.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory
pod.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory
}
if !resources.GPU.IsZero() {
pod.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
pod.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
}
}
}
return results, nil
func GetPodResources(pod *corev1.Pod, containerName string) servertypes.ResourceQuantity {
resource := servertypes.ResourceQuantity{
CPU: resource.MustParse("0"),
GPU: resource.MustParse("0"),
Memory: resource.MustParse("0"),
}
for _, container := range pod.Spec.Containers {
if container.Name != containerName {
continue
}
if container.Resources.Limits == nil && container.Resources.Requests == nil {
break
}
if container.Resources.Requests != nil {
resource.CPU = container.Resources.Requests[corev1.ResourceCPU].DeepCopy()
resource.GPU = container.Resources.Requests[corev1.ResourceName("nvidia.com/gpu")].DeepCopy()
resource.Memory = container.Resources.Requests[corev1.ResourceMemory].DeepCopy()
}
if container.Resources.Limits != nil {
resource.CPU = container.Resources.Limits[corev1.ResourceCPU].DeepCopy()
resource.GPU = container.Resources.Limits[corev1.ResourceName("nvidia.com/gpu")].DeepCopy()
resource.Memory = container.Resources.Limits[corev1.ResourceMemory].DeepCopy()
}
}
return resource
}
func ListReplicaPodsWithSelector(dyi dynamic.Informers, ns string, labelSelector labels.Selector) (
func ListReplicaPodsWithSelector(dyi dynamic.Informers, namespace string, labelSelector labels.Selector) (
collectors []*corev1.Pod, learners []*corev1.Pod, coordinator *corev1.Pod, aggregator *corev1.Pod, err error) {
// list pods that belong to the NerveXJob
pods, err := ListPodsWithSelector(dyi, ns, labelSelector)
pods, err := ListPodsWithSelector(dyi, namespace, labelSelector)
if err != nil {
return
}
......@@ -147,60 +205,6 @@ func FilterOutTerminatingPods(pods []*corev1.Pod) []*corev1.Pod {
return results
}
func DeleteReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, njreq *servertypes.NerveXJobRequest, replicaType string) ([]string, error) {
results := []string{}
resources := servertypes.ResourceQuantity{}
var containerName, portName string
var defaultPort int32
ns := njreq.Namespace
switch replicaType {
case nervexutil.CollectorName:
resources = njreq.Collectors
containerName = nervexutil.DefaultCollectorContainerName
portName = nervexutil.DefaultCollectorPortName
defaultPort = nervexutil.DefaultCollectorPort
case nervexutil.LearnerName:
resources = njreq.Learners
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
}
for _, pod := range pods {
// break if enough
if len(results) >= resources.Replicas {
break
}
// delete pods
err := kubeClient.CoreV1().Pods(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
// continue if pod does not exist
if k8serrors.IsNotFound(err) {
continue
}
return results, err
}
// delete services
err = kubeClient.CoreV1().Services(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
continue
}
return results, err
}
result := nervexutil.GetPodAccessURL(pod, ns, containerName, portName, defaultPort)
results = append(results, result)
}
return results, nil
}
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
......
package k8s
import (
corev1 "k8s.io/api/core/v1"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
)
func SetPodResources(template *corev1.Pod, resources servertypes.ResourceQuantity, containerName string) {
for i := range template.Spec.Containers {
if template.Spec.Containers[i].Name != containerName {
continue
}
if template.Spec.Containers[i].Resources.Limits == nil {
template.Spec.Containers[i].Resources.Limits = make(corev1.ResourceList)
}
if template.Spec.Containers[i].Resources.Requests == nil {
template.Spec.Containers[i].Resources.Requests = make(corev1.ResourceList)
}
// cpu and memory must not be zero
if !resources.CPU.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceCPU] = resources.CPU
template.Spec.Containers[i].Resources.Requests[corev1.ResourceCPU] = resources.CPU
}
if !resources.Memory.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = resources.Memory
template.Spec.Containers[i].Resources.Requests[corev1.ResourceMemory] = resources.Memory
}
if !resources.GPU.IsZero() {
template.Spec.Containers[i].Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
template.Spec.Containers[i].Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resources.GPU
}
}
}
......@@ -27,6 +27,9 @@ const (
// StatusCode = 400
ErrorBadRequest ErrorType = "BadRequest"
// StatusCode = 501
ErrorNotImplemented ErrorType = "NotImplemented"
)
func IsNotFound(err error) bool {
......@@ -41,6 +44,10 @@ func IsBadRequest(err error) bool {
return TypeForError(err) == ErrorBadRequest
}
func IsNotImplemented(err error) bool {
return TypeForError(err) == ErrorNotImplemented
}
func TypeForError(err error) ErrorType {
var nvxErr *NerveXError
if errors.As(err, &nvxErr) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册