提交 d3a8d8e0 编写于 作者: L liqingping

feat: update package structure

上级 b9fe9219
package k8s
package http
import (
"fmt"
......@@ -8,18 +8,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
"go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
func GetNerveXJob(dyi dynamic.Informers, namespace, coordinatorName string) (*nervexv1alpha1.NerveXJob, error) {
func (s *NerveXServer) GetNerveXJob(namespace, coordinatorName string) (*nervexv1alpha1.NerveXJob, error) {
// get coordinator
coorKey := nervexutil.NamespacedName(namespace, coordinatorName)
coordinator, err := GetPodByKey(dyi, coorKey)
coordinator, err := s.GetPodByKey(coorKey)
if err != nil {
return nil, err
}
......@@ -40,7 +38,7 @@ func GetNerveXJob(dyi dynamic.Informers, namespace, coordinatorName string) (*ne
// get NerveXJob
njKey := nervexutil.NamespacedName(namespace, ownRefer.Name)
nvxJob, err := GetNerveXJobByKey(dyi, njKey)
nvxJob, err := s.GetNerveXJobByKey(njKey)
if err != nil {
return nil, err
}
......@@ -48,8 +46,8 @@ func GetNerveXJob(dyi dynamic.Informers, namespace, coordinatorName string) (*ne
return nvxJob, nil
}
func GetNerveXJobByKey(dyi dynamic.Informers, key string) (*nervexv1alpha1.NerveXJob, error) {
obj, exists, err := dyi.NJInformer.Informer().GetIndexer().GetByKey(key)
func (s *NerveXServer) GetNerveXJobByKey(key string) (*nervexv1alpha1.NerveXJob, error) {
obj, exists, err := s.dyi.NJInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get NerveXJob: %s", err)
return nil, fmt.Errorf(errMsg)
......@@ -70,8 +68,7 @@ func GetNerveXJobByKey(dyi dynamic.Informers, key string) (*nervexv1alpha1.Nerve
return &nvxJob, nil
}
func CreateCollectorsAndLearnersForNerveXJob(
kubeClient *kubernetes.Clientset,
func (s *NerveXServer) CreateCollectorsAndLearnersForNerveXJob(
njreq *servertypes.NerveXJobRequest,
job *nervexv1alpha1.NerveXJob) ([]string, []string, error) {
......@@ -86,7 +83,7 @@ func CreateCollectorsAndLearnersForNerveXJob(
// create collectors
collectorTemplate := job.Spec.Collector.Template
collectors, err := CreateReplicas(kubeClient, &collectorTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.CollectorName)
collectors, err := s.CreateReplicas(&collectorTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.CollectorName)
if err != nil {
return collectors, nil, err
......@@ -94,7 +91,7 @@ func CreateCollectorsAndLearnersForNerveXJob(
// create learners
learnerTemplate := job.Spec.Learner.Template
learners, err := CreateReplicas(kubeClient, &learnerTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.LearnerName)
learners, err := s.CreateReplicas(&learnerTemplate, ownRefer, njreq.Collectors, njreq.Namespace, nervexutil.LearnerName)
if err != nil {
return collectors, learners, err
......@@ -103,8 +100,7 @@ func CreateCollectorsAndLearnersForNerveXJob(
return collectors, learners, nil
}
func CreateReplicas(
kubeClient *kubernetes.Clientset,
func (s *NerveXServer) CreateReplicas(
template *corev1.PodTemplateSpec,
ownRefer metav1.OwnerReference,
resources servertypes.ResourceQuantity,
......@@ -134,7 +130,7 @@ func CreateReplicas(
return results, err
}
// set pod resources
SetPodResources(pod, resources, containerName)
s.SetPodResources(pod, resources, containerName)
// build service
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
......@@ -142,7 +138,7 @@ func CreateReplicas(
svc.Name = pod.Name
// create pod
if err := CreatePodAndService(kubeClient, namespace, pod, svc); err != nil {
if err := s.CreatePodAndService(namespace, pod, svc); err != nil {
return results, err
}
......@@ -153,7 +149,7 @@ func CreateReplicas(
return results, nil
}
func DeleteReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namespace string, replicas int, replicaType string) ([]string, error) {
func (s *NerveXServer) DeleteReplicas(pods []*corev1.Pod, namespace string, replicas int, replicaType string) ([]string, error) {
var containerName, portName string
var defaultPort int32
......@@ -178,7 +174,7 @@ func DeleteReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namesp
}
// delete pods and services
if err := DeletePodAndService(kubeClient, namespace, pod.Name); err != nil {
if err := s.DeletePodAndService(namespace, pod.Name); err != nil {
return results, err
}
......@@ -189,7 +185,7 @@ func DeleteReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namesp
return results, nil
}
func RecreateReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, namespace, replicaType string) ([]string, error) {
func (s *NerveXServer) RecreateReplicas(pods []*corev1.Pod, namespace, replicaType string) ([]string, error) {
var containerName, portName string
var defaultPort int32
switch replicaType {
......@@ -207,7 +203,7 @@ func RecreateReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, name
// delete pods and services
for _, pod := range pods {
if err := DeletePodAndService(kubeClient, namespace, pod.Name); err != nil {
if err := s.DeletePodAndService(namespace, pod.Name); err != nil {
return nil, err
}
}
......@@ -237,7 +233,7 @@ func RecreateReplicas(kubeClient *kubernetes.Clientset, pods []*corev1.Pod, name
svc.SetOwnerReferences(pod.GetOwnerReferences())
svc.Name = pod.Name
if err := CreatePodAndService(kubeClient, namespace, pod, svc); err != nil {
if err := s.CreatePodAndService(namespace, pod, svc); err != nil {
return results, err
}
......
package k8s
package http
import (
"context"
......@@ -11,14 +11,12 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
func GetPodsByNames(dyi dynamic.Informers, namespace string, names []string) ([]*corev1.Pod, error) {
func (s *NerveXServer) GetPodsByNames(namespace string, names []string) ([]*corev1.Pod, error) {
var keys []string
var pods []*corev1.Pod
for _, name := range names {
......@@ -26,17 +24,17 @@ func GetPodsByNames(dyi dynamic.Informers, namespace string, names []string) ([]
keys = append(keys, key)
}
pods, err := GetPodsByKeys(dyi, keys)
pods, err := s.GetPodsByKeys(keys)
if err != nil {
return pods, err
}
return pods, nil
}
func GetPodsByKeys(dyi dynamic.Informers, keys []string) ([]*corev1.Pod, error) {
func (s *NerveXServer) GetPodsByKeys(keys []string) ([]*corev1.Pod, error) {
var pods []*corev1.Pod
for _, key := range keys {
pod, err := GetPodByKey(dyi, key)
pod, err := s.GetPodByKey(key)
if err != nil {
return pods, err
}
......@@ -45,8 +43,8 @@ func GetPodsByKeys(dyi dynamic.Informers, keys []string) ([]*corev1.Pod, error)
return pods, nil
}
func GetPodByKey(dyi dynamic.Informers, key string) (*corev1.Pod, error) {
obj, exists, err := dyi.PodInformer.Informer().GetIndexer().GetByKey(key)
func (s *NerveXServer) GetPodByKey(key string) (*corev1.Pod, error) {
obj, exists, err := s.dyi.PodInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get pod: %s", err)
return nil, fmt.Errorf(errMsg)
......@@ -67,9 +65,9 @@ func GetPodByKey(dyi dynamic.Informers, key string) (*corev1.Pod, error) {
return &pod, nil
}
func CreatePodAndService(kubeClient *kubernetes.Clientset, namespace string, pod *corev1.Pod, service *corev1.Service) error {
func (s *NerveXServer) CreatePodAndService(namespace string, pod *corev1.Pod, service *corev1.Service) error {
// create pod
_, err := kubeClient.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{})
_, err := s.KubeClient.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return &servertypes.NerveXError{Type: servertypes.ErrorAlreadyExists, Message: err.Error()}
......@@ -77,7 +75,7 @@ func CreatePodAndService(kubeClient *kubernetes.Clientset, namespace string, pod
return err
}
// create service
_, err = kubeClient.CoreV1().Services(namespace).Create(context.Background(), service, metav1.CreateOptions{})
_, err = s.KubeClient.CoreV1().Services(namespace).Create(context.Background(), service, metav1.CreateOptions{})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return &servertypes.NerveXError{Type: servertypes.ErrorAlreadyExists, Message: err.Error()}
......@@ -87,20 +85,20 @@ func CreatePodAndService(kubeClient *kubernetes.Clientset, namespace string, pod
return nil
}
func DeletePodAndService(kubeClient *kubernetes.Clientset, namespace, name string) error {
func (s *NerveXServer) DeletePodAndService(namespace, name string) error {
// delete pods
if err := kubeClient.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
if err := s.KubeClient.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
return err
}
// delete services
if err := kubeClient.CoreV1().Services(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
if err := s.KubeClient.CoreV1().Services(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
return err
}
return nil
}
func SetPodResources(pod *corev1.Pod, resources servertypes.ResourceQuantity, containerName string) {
func (s *NerveXServer) SetPodResources(pod *corev1.Pod, resources servertypes.ResourceQuantity, containerName string) {
for i := range pod.Spec.Containers {
if pod.Spec.Containers[i].Name != containerName {
continue
......@@ -128,7 +126,7 @@ func SetPodResources(pod *corev1.Pod, resources servertypes.ResourceQuantity, co
}
}
func GetPodResources(pod *corev1.Pod, containerName string) servertypes.ResourceQuantity {
func (s *NerveXServer) GetPodResources(pod *corev1.Pod, containerName string) servertypes.ResourceQuantity {
resource := servertypes.ResourceQuantity{
CPU: resource.MustParse("0"),
GPU: resource.MustParse("0"),
......@@ -155,16 +153,16 @@ func GetPodResources(pod *corev1.Pod, containerName string) servertypes.Resource
return resource
}
func ListReplicaPodsWithSelector(dyi dynamic.Informers, namespace string, labelSelector labels.Selector) (
func (s *NerveXServer) ListReplicaPodsWithSelector(namespace string, labelSelector labels.Selector) (
collectors []*corev1.Pod, learners []*corev1.Pod, coordinator *corev1.Pod, aggregator *corev1.Pod, err error) {
// list pods that belong to the NerveXJob
pods, err := ListPodsWithSelector(dyi, namespace, labelSelector)
pods, err := s.ListPodsWithSelector(namespace, labelSelector)
if err != nil {
return
}
// filter out terminating pods since these pods are deleted
pods = FilterOutTerminatingPods(pods)
pods = nervexutil.FilterOutTerminatingPods(pods)
// classify pods
collectors, learners, coordinator, aggregator, err = nervexutil.ClassifyPods(pods)
......@@ -174,8 +172,8 @@ func ListReplicaPodsWithSelector(dyi dynamic.Informers, namespace string, labelS
return
}
func ListPodsWithSelector(dyi dynamic.Informers, namespace string, labelSelector labels.Selector) ([]*corev1.Pod, error) {
ret, err := dyi.PodInformer.Lister().ByNamespace(namespace).List(labelSelector)
func (s *NerveXServer) ListPodsWithSelector(namespace string, labelSelector labels.Selector) ([]*corev1.Pod, error) {
ret, err := s.dyi.PodInformer.Lister().ByNamespace(namespace).List(labelSelector)
if err != nil {
return nil, err
}
......@@ -192,20 +190,3 @@ func ListPodsWithSelector(dyi dynamic.Informers, namespace string, labelSelector
return pods, nil
}
func FilterOutTerminatingPods(pods []*corev1.Pod) []*corev1.Pod {
results := []*corev1.Pod{}
for _, pod := range pods {
if isTerminating(pod) {
continue
}
results = append(results, pod)
}
return results
}
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
}
......@@ -14,7 +14,6 @@ import (
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
serverdynamic "go-sensephoenix.sensetime.com/nervex-operator/server/dynamic"
serverk8s "go-sensephoenix.sensetime.com/nervex-operator/server/k8s"
servertypes "go-sensephoenix.sensetime.com/nervex-operator/server/types"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
......@@ -167,7 +166,7 @@ func (s *NerveXServer) getNamespacedReplicas(namespace string) ([]servertypes.Ne
}
// list coordinators in namespace
pods, err := serverk8s.ListPodsWithSelector(s.dyi, namespace, labelSelector)
pods, err := s.ListPodsWithSelector(namespace, labelSelector)
if err != nil {
return nil, err
}
......@@ -189,7 +188,7 @@ func (s *NerveXServer) getNamespacedReplicasByCoordinator(namespace, coordinator
log := s.Log.WithName("NerveXServer")
// get ownReference of the request coordinator
nvxJob, err := serverk8s.GetNerveXJob(s.dyi, namespace, coordinatorName)
nvxJob, err := s.GetNerveXJob(namespace, coordinatorName)
if err != nil {
log.Error(err, "failed to get owner reference")
return servertypes.NerveXJobResponse{}, err
......@@ -202,7 +201,7 @@ func (s *NerveXServer) getNamespacedReplicasByCoordinator(namespace, coordinator
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
collectors, learners, _, _, err := serverk8s.ListReplicaPodsWithSelector(s.dyi, namespace, labelSelector)
collectors, learners, _, _, err := s.ListReplicaPodsWithSelector(namespace, labelSelector)
if err != nil {
log.Error(err, "failed to list collectors and learners")
return servertypes.NerveXJobResponse{}, err
......@@ -252,13 +251,13 @@ func (s *NerveXServer) addReplicas(r *http.Request) (servertypes.NerveXJobRespon
}
// get ownReference of request coordinator
nvxJob, err := serverk8s.GetNerveXJob(s.dyi, njreq.Namespace, njreq.Coordinator)
nvxJob, err := s.GetNerveXJob(njreq.Namespace, njreq.Coordinator)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
// create collectors and learners
collectors, learners, err := serverk8s.CreateCollectorsAndLearnersForNerveXJob(s.KubeClient, &njreq, nvxJob)
collectors, learners, err := s.CreateCollectorsAndLearnersForNerveXJob(&njreq, nvxJob)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
......@@ -286,7 +285,7 @@ func (s *NerveXServer) deleteReplicas(r *http.Request) (servertypes.NerveXJobRes
}
// get ownReference of the request coordinator
nvxJob, err := serverk8s.GetNerveXJob(s.dyi, njreq.Namespace, njreq.Coordinator)
nvxJob, err := s.GetNerveXJob(njreq.Namespace, njreq.Coordinator)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
......@@ -298,19 +297,19 @@ func (s *NerveXServer) deleteReplicas(r *http.Request) (servertypes.NerveXJobRes
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
collectors, learners, _, _, err := serverk8s.ListReplicaPodsWithSelector(s.dyi, njreq.Namespace, labelSelector)
collectors, learners, _, _, err := s.ListReplicaPodsWithSelector(njreq.Namespace, labelSelector)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
// delete collector pods
delCollectors, err := serverk8s.DeleteReplicas(s.KubeClient, collectors, njreq.Namespace, njreq.Collectors.Replicas, nervexutil.CollectorName)
delCollectors, err := s.DeleteReplicas(collectors, njreq.Namespace, njreq.Collectors.Replicas, nervexutil.CollectorName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
// delete learner pods
delLearners, err := serverk8s.DeleteReplicas(s.KubeClient, learners, njreq.Namespace, njreq.Learners.Replicas, nervexutil.LearnerName)
delLearners, err := s.DeleteReplicas(learners, njreq.Namespace, njreq.Learners.Replicas, nervexutil.LearnerName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
......@@ -362,13 +361,13 @@ func (s *NerveXServer) replicasFailed(r *http.Request) (servertypes.NerveXJobRes
}
log.Info("delete request body: ", "request", njreq)
cpods, err := serverk8s.GetPodsByNames(s.dyi, njreq.Namespace, njreq.Collectors)
collectors, err := serverk8s.RecreateReplicas(s.KubeClient, cpods, njreq.Namespace, nervexutil.CollectorName)
cpods, err := s.GetPodsByNames(njreq.Namespace, njreq.Collectors)
collectors, err := s.RecreateReplicas(cpods, njreq.Namespace, nervexutil.CollectorName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
lpods, err := serverk8s.GetPodsByNames(s.dyi, njreq.Namespace, njreq.Learners)
learners, err := serverk8s.RecreateReplicas(s.KubeClient, lpods, njreq.Namespace, nervexutil.LearnerName)
lpods, err := s.GetPodsByNames(njreq.Namespace, njreq.Learners)
learners, err := s.RecreateReplicas(lpods, njreq.Namespace, nervexutil.LearnerName)
if err != nil {
return servertypes.NerveXJobResponse{}, err
}
......
......@@ -270,3 +270,20 @@ func filterReplicaPods(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, e
}
return result, nil
}
func FilterOutTerminatingPods(pods []*corev1.Pod) []*corev1.Pod {
results := []*corev1.Pod{}
for _, pod := range pods {
if isTerminating(pod) {
continue
}
results = append(results, pod)
}
return results
}
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册