提交 dffd1fcf 编写于 作者: L liqingping

feat: add error definition to support multiple status code

上级 1e1343b5
package errors
import (
"errors"
)
type NerveXError struct {
Type ErrorType `json:"type"`
Message string `json:"message"`
}
func (n *NerveXError) Error() string {
return n.Message
}
type ErrorType string
const (
// StatusCode = 500
ErrorUnknown ErrorType = "Unknown"
// StatusCode = 404
ErrorNotFound ErrorType = "NotFound"
// StatusCode = 409
ErrorAlreadyExists ErrorType = "AlreadyExists"
// StatusCode = 400
ErrorBadRequest ErrorType = "BadRequest"
)
func IsNotFound(err error) bool {
return TypeForError(err) == ErrorNotFound
}
func IsAlreadyExists(err error) bool {
return TypeForError(err) == ErrorAlreadyExists
}
func IsBadRequest(err error) bool {
return TypeForError(err) == ErrorBadRequest
}
func TypeForError(err error) ErrorType {
var nvxErr *NerveXError
if errors.As(err, &nvxErr) {
return nvxErr.Type
}
return ErrorUnknown
}
......@@ -9,12 +9,14 @@ import (
"strings"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
nervexv1alpha1 "go-sensephoenix.sensetime.com/nervex-operator/api/v1alpha1"
. "go-sensephoenix.sensetime.com/nervex-operator/server/errors"
nervexutil "go-sensephoenix.sensetime.com/nervex-operator/utils"
)
......@@ -58,10 +60,24 @@ func (s *NerveXServer) Replicas(w http.ResponseWriter, r *http.Request) {
var success bool = true
var code int = CodeSuccess
var statusCode int = http.StatusOK
if err != nil {
success = false
code = CodeFailed
msg = err.Error()
// define status code
if IsNotFound(err) {
statusCode = http.StatusNotFound
} else if IsAlreadyExists(err) {
statusCode = http.StatusConflict
} else if IsBadRequest(err) {
statusCode = http.StatusBadRequest
} else {
statusCode = http.StatusInternalServerError
}
log.Error(err, "failed to process request")
}
// build response
......@@ -73,14 +89,12 @@ func (s *NerveXServer) Replicas(w http.ResponseWriter, r *http.Request) {
}
// write response
if err = writeResponse(w, rep); err != nil {
if err = writeResponse(w, rep, statusCode); err != nil {
log.Error(err, "failed to write response")
}
}
func (s *NerveXServer) getReplicas(r *http.Request) (interface{}, error) {
log := s.Log.WithName("NerveXServer")
// get request params from request
rp := NerveXJobRequestParams{}
params := r.URL.Query()
......@@ -93,8 +107,8 @@ func (s *NerveXServer) getReplicas(r *http.Request) (interface{}, error) {
case RequestParamTypeName:
rp.Name = v
default:
errInfo := fmt.Errorf("request param %s is not supported, ignored", k)
log.Error(errInfo, "")
errInfo := fmt.Sprintf("request param %s is not supported", k)
return nil, &NerveXError{Type: ErrorBadRequest, Message: errInfo}
}
}
......@@ -256,27 +270,23 @@ func (s *NerveXServer) ReplicasFailed(w http.ResponseWriter, r *http.Request) {
}
func (s *NerveXServer) addReplicas(r *http.Request) (NerveXJobResponse, error) {
log := s.Log.WithName("NerveXServer")
// get request body
var njreq NerveXJobRequest
err := json.NewDecoder(r.Body).Decode(&njreq)
if err != nil {
return NerveXJobResponse{}, err
errMsg := fmt.Sprintf("failed to decode request body: %v", err)
return NerveXJobResponse{}, &NerveXError{Type: ErrorBadRequest, Message: errMsg}
}
// get ownReference of request coordinator
nvxJob, err := s.getNerveXJob(njreq.Namespace, njreq.Coordinator)
if err != nil {
log.Error(err, "failed to get NerveXJob from coordinator", "coordinator", nervexutil.NamespacedName(njreq.Namespace, njreq.Coordinator))
return NerveXJobResponse{}, err
}
// create collectors and learners
collectors, learners, err := s.createCollectorsAndLearnersFromNerveXJob(&njreq, nvxJob)
if err != nil {
errMsg := fmt.Sprintf("failed create collectors and learners: %s", err)
log.Error(err, errMsg)
return NerveXJobResponse{}, err
}
......@@ -291,19 +301,17 @@ func (s *NerveXServer) addReplicas(r *http.Request) (NerveXJobResponse, error) {
}
func (s *NerveXServer) deleteReplicas(r *http.Request) (NerveXJobResponse, error) {
log := s.Log.WithName("NerveXServer")
// get request body
var njreq NerveXJobRequest
err := json.NewDecoder(r.Body).Decode(&njreq)
if err != nil {
return NerveXJobResponse{}, err
errMsg := fmt.Sprintf("failed to decode request body: %v", err)
return NerveXJobResponse{}, &NerveXError{Type: ErrorBadRequest, Message: errMsg}
}
// get ownReference of the request coordinator
nvxJob, err := s.getNerveXJob(njreq.Namespace, njreq.Coordinator)
if err != nil {
log.Error(err, "failed to get owner reference")
return NerveXJobResponse{}, err
}
......@@ -316,23 +324,18 @@ func (s *NerveXServer) deleteReplicas(r *http.Request) (NerveXJobResponse, error
}
collectors, learners, _, _, err := s.listReplicaPodsWithSelector(njreq.Namespace, labelSelector)
if err != nil {
log.Error(err, "failed to list collectors and learners")
return NerveXJobResponse{}, err
}
// delete collector pods
delCollectors, err := s.DeletePodsAndServices(collectors, &njreq, nervexutil.CollectorName)
if err != nil {
errMsg := fmt.Sprintf("failed to delete collector: %s", err)
log.Error(err, errMsg)
return NerveXJobResponse{}, err
}
// delete learner pods
delLearners, err := s.DeletePodsAndServices(learners, &njreq, nervexutil.LearnerName)
if err != nil {
errMsg := fmt.Sprintf("failed to delete learner: %s", err)
log.Error(err, errMsg)
return NerveXJobResponse{}, err
}
......@@ -364,8 +367,8 @@ func (s *NerveXServer) getNerveXJob(namespace, coordinatorName string) (*nervexv
}
}
if !ownByNerveX {
errMsg := "coordinator is not owned by NerveXJob"
return nil, fmt.Errorf(errMsg)
errMsg := fmt.Sprintf("coordinator %s is not owned by any NerveXJob", coordinatorName)
return nil, &NerveXError{Type: ErrorNotFound, Message: errMsg}
}
// get NerveXJob
......@@ -387,7 +390,7 @@ func (s *NerveXServer) getNerveXJobByKey(key string) (*nervexv1alpha1.NerveXJob,
if !exists {
errMsg := fmt.Sprintf("NerveXJob: %s not exists in cache", key)
return nil, fmt.Errorf(errMsg)
return nil, &NerveXError{Type: ErrorNotFound, Message: errMsg}
}
nvxUn := obj.(*unstructured.Unstructured)
var nvxJob nervexv1alpha1.NerveXJob
......@@ -408,7 +411,7 @@ func (s *NerveXServer) getPodByKey(key string) (*corev1.Pod, error) {
}
if !exists {
errMsg := fmt.Sprintf("pod: %s not exists in cache", key)
return nil, fmt.Errorf(errMsg)
return nil, &NerveXError{Type: ErrorNotFound, Message: errMsg}
}
podUn := obj.(*unstructured.Unstructured)
......@@ -422,9 +425,9 @@ func (s *NerveXServer) getPodByKey(key string) (*corev1.Pod, error) {
return &pod, nil
}
func writeResponse(w http.ResponseWriter, rep Response) error {
func writeResponse(w http.ResponseWriter, rep Response, statusCode int) error {
w.Header().Set("Conten-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.WriteHeader(statusCode)
repJSON, err := json.Marshal(rep)
if err != nil {
errMsg := fmt.Sprintf("failed to marshal json: %s", err)
......@@ -505,7 +508,7 @@ func (s *NerveXServer) createCollectorsAndLearnersFromNerveXJob(
nervexutil.CollectorName, nervexutil.DefaultCollectorContainerName, nervexutil.DefaultCollectorPortName, nervexutil.DefaultCollectorPort)
if err != nil {
return nil, nil, err
return collectors, nil, err
}
// create learners
......@@ -514,7 +517,7 @@ func (s *NerveXServer) createCollectorsAndLearnersFromNerveXJob(
nervexutil.LearnerName, nervexutil.DefaultLearnerContainerName, nervexutil.DefaultLearnerPortName, nervexutil.DefaultLearnerPort)
if err != nil {
return nil, nil, err
return collectors, learners, err
}
return collectors, learners, nil
......@@ -541,7 +544,7 @@ func (s *NerveXServer) createPodsAndServices(
// build pod
pod, port, err := nervexutil.BuildPodFromTemplate(template.DeepCopy(), ownRefer, ns, replicaType, containerName, portName, defaultPort)
if err != nil {
return nil, err
return results, err
}
// set pod resources
SetPodResources(pod, resources, containerName)
......@@ -549,7 +552,10 @@ func (s *NerveXServer) createPodsAndServices(
// create pod
_, err = s.KubeClient.CoreV1().Pods(ns).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
return nil, err
if k8serrors.IsAlreadyExists(err) {
return results, &NerveXError{Type: ErrorAlreadyExists, Message: err.Error()}
}
return results, err
}
// build service
......@@ -560,7 +566,10 @@ func (s *NerveXServer) createPodsAndServices(
// create service
_, err = s.KubeClient.CoreV1().Services(ns).Create(context.Background(), svc, metav1.CreateOptions{})
if err != nil {
return nil, err
if k8serrors.IsAlreadyExists(err) {
return results, &NerveXError{Type: ErrorAlreadyExists, Message: err.Error()}
}
return results, err
}
result := nervexutil.ConcatURL(svc.Name, ns, port)
......@@ -607,6 +616,8 @@ func (s *NerveXServer) DeletePodsAndServices(pods []*corev1.Pod, njreq *NerveXJo
containerName = nervexutil.DefaultLearnerContainerName
portName = nervexutil.DefaultLearnerPortName
defaultPort = nervexutil.DefaultLearnerPort
default:
return nil, &NerveXError{Type: ErrorBadRequest, Message: fmt.Sprintf("replica type %s is not supported", replicaType)}
}
for _, pod := range pods {
......@@ -615,12 +626,21 @@ func (s *NerveXServer) DeletePodsAndServices(pods []*corev1.Pod, njreq *NerveXJo
break
}
// delete pods
err := s.KubeClient.CoreV1().Pods(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return results, &NerveXError{Type: ErrorNotFound, Message: err.Error()}
}
return results, err
}
// delete services
err = s.KubeClient.CoreV1().Services(njreq.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return results, &NerveXError{Type: ErrorNotFound, Message: err.Error()}
}
return results, err
}
......
......@@ -109,7 +109,6 @@ func BuildPodFromTemplate(
podName = GenerateName(podName)
case AggregatorName:
portEnv = "AGGREGATOR_PORT"
case CoordinatorName:
portEnv = "COORDINATOR_PORT"
default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册