From f8e7d06b079af8f7542bdd068209bbbb822b2fab Mon Sep 17 00:00:00 2001 From: zryfish Date: Fri, 13 Mar 2020 21:57:48 +0800 Subject: [PATCH] move apigateway into apiserver (#1948) --- cmd/ks-apiserver/app/options/options.go | 72 ++- cmd/ks-apiserver/app/server.go | 3 +- pkg/apiserver/apiserver.go | 60 ++- .../authentication/token_authenticator.go | 29 ++ pkg/apiserver/dispatch/dispatch.go | 36 ++ pkg/apiserver/filters/authentication.go | 33 ++ pkg/apiserver/filters/authorization.go | 71 +++ pkg/apiserver/filters/dispatch.go | 32 ++ pkg/apiserver/filters/kubeapiserver.go | 44 ++ pkg/apiserver/filters/requestinfo.go | 22 + pkg/apiserver/request/requestinfo.go | 173 +++++++ pkg/apiserver/server/handler.go | 1 + pkg/apiserver/servicemesh/metrics/handlers.go | 70 --- pkg/apiserver/servicemesh/tracing/handlers.go | 45 -- pkg/kapis/resources/v1alpha3/register.go | 2 +- .../servicemesh/metrics/v1alpha2/handler.go | 109 ++++ .../servicemesh/metrics/v1alpha2/register.go | 31 +- .../resources/v1alpha2/resource/resources.go | 6 +- pkg/models/routers/routers.go | 2 +- pkg/server/filter/logging.go | 61 --- pkg/server/server.go | 24 - vendor/github.com/mxk/go-flowrate/LICENSE | 29 ++ .../mxk/go-flowrate/flowrate/flowrate.go | 267 ++++++++++ .../github.com/mxk/go-flowrate/flowrate/io.go | 133 +++++ .../mxk/go-flowrate/flowrate/util.go | 67 +++ .../apimachinery/pkg/util/proxy/dial.go | 117 +++++ .../k8s.io/apimachinery/pkg/util/proxy/doc.go | 18 + .../apimachinery/pkg/util/proxy/transport.go | 274 ++++++++++ .../pkg/util/proxy/upgradeaware.go | 480 ++++++++++++++++++ vendor/modules.txt | 3 + 30 files changed, 2057 insertions(+), 257 deletions(-) create mode 100644 pkg/apiserver/authentication/token_authenticator.go create mode 100644 pkg/apiserver/dispatch/dispatch.go create mode 100644 pkg/apiserver/filters/authentication.go create mode 100644 pkg/apiserver/filters/authorization.go create mode 100644 pkg/apiserver/filters/dispatch.go create mode 100644 pkg/apiserver/filters/kubeapiserver.go create mode 100644 pkg/apiserver/filters/requestinfo.go create mode 100644 pkg/apiserver/request/requestinfo.go create mode 100644 pkg/apiserver/server/handler.go delete mode 100644 pkg/apiserver/servicemesh/metrics/handlers.go delete mode 100644 pkg/apiserver/servicemesh/tracing/handlers.go delete mode 100644 pkg/server/filter/logging.go delete mode 100644 pkg/server/server.go create mode 100644 vendor/github.com/mxk/go-flowrate/LICENSE create mode 100644 vendor/github.com/mxk/go-flowrate/flowrate/flowrate.go create mode 100644 vendor/github.com/mxk/go-flowrate/flowrate/io.go create mode 100644 vendor/github.com/mxk/go-flowrate/flowrate/util.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/proxy/dial.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/proxy/doc.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/proxy/transport.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go diff --git a/cmd/ks-apiserver/app/options/options.go b/cmd/ks-apiserver/app/options/options.go index eb5eb5c9..6788b0c9 100644 --- a/cmd/ks-apiserver/app/options/options.go +++ b/cmd/ks-apiserver/app/options/options.go @@ -8,6 +8,7 @@ import ( "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api/iam" "kubesphere.io/kubesphere/pkg/apiserver" + "kubesphere.io/kubesphere/pkg/informers" genericoptions "kubesphere.io/kubesphere/pkg/server/options" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" @@ -89,36 +90,66 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { } func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) { + apiServer := &apiserver.APIServer{} + kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions) if err != nil { return nil, err } + apiServer.KubernetesClient = kubernetesClient + + informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(), kubernetesClient.Istio(), kubernetesClient.Application()) + apiServer.InformerFactory = informerFactory monitoringClient := prometheus.NewPrometheus(s.MonitoringOptions) + apiServer.MonitoringClient = monitoringClient - loggingClient, err := esclient.NewElasticsearch(s.LoggingOptions) - if err != nil { - return nil, err + if s.LoggingOptions.Host != "" { + loggingClient, err := esclient.NewElasticsearch(s.LoggingOptions) + if err != nil { + return nil, err + } + apiServer.LoggingClient = loggingClient } - s3Client, err := s3.NewS3Client(s.S3Options) - if err != nil { - return nil, err + if s.S3Options.Endpoint != "" { + s3Client, err := s3.NewS3Client(s.S3Options) + if err != nil { + return nil, err + } + apiServer.S3Client = s3Client } - devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions) - if err != nil { - return nil, err + if s.DevopsOptions.Host != "" { + devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions) + if err != nil { + return nil, err + } + apiServer.DevopsClient = devopsClient } - ldapClient, err := ldap.NewLdapClient(s.LdapOptions, stopCh) - if err != nil { - return nil, err + if s.LdapOptions.Host != "" { + ldapClient, err := ldap.NewLdapClient(s.LdapOptions, stopCh) + if err != nil { + return nil, err + } + apiServer.LdapClient = ldapClient } - cacheClient, err := cache.NewRedisClient(s.CacheOptions, stopCh) - if err != nil { - return nil, err + if s.CacheOptions.RedisURL != "" { + cacheClient, err := cache.NewRedisClient(s.CacheOptions, stopCh) + if err != nil { + return nil, err + } + apiServer.CacheClient = cacheClient + } + + if s.MySQLOptions.Host != "" { + dbClient, err := mysql.NewMySQLClient(s.MySQLOptions, stopCh) + if err != nil { + return nil, err + } + apiServer.DBClient = dbClient } server := &http.Server{ @@ -133,16 +164,7 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS server.TLSConfig.Certificates = []tls.Certificate{certificate} } - apiServer := &apiserver.APIServer{ - Server: server, - KubernetesClient: kubernetesClient, - MonitoringClient: monitoringClient, - LoggingClient: loggingClient, - S3Client: s3Client, - DevopsClient: devopsClient, - LdapClient: ldapClient, - CacheClient: cacheClient, - } + apiServer.Server = server return apiServer, nil } diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 7fff6efc..b260ecb8 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -25,7 +25,6 @@ import ( cliflag "k8s.io/component-base/cli/flag" "kubesphere.io/kubesphere/cmd/ks-apiserver/app/options" apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config" - "kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing" "kubesphere.io/kubesphere/pkg/utils/signals" "kubesphere.io/kubesphere/pkg/utils/term" ) @@ -100,7 +99,7 @@ func initializeServicemeshConfig(s *options.ServerRunOptions) { // Initialize kiali config config := kconfig.NewConfig() - tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost + //tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost // Exclude system namespaces config.API.Namespaces.Exclude = []string{"istio-system", "kubesphere*", "kube*"} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d551d4b2..4e605fb8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -7,9 +7,18 @@ import ( "github.com/emicklei/go-restful" "k8s.io/apimachinery/pkg/runtime/schema" urlruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/request/bearertoken" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api/iam" + "kubesphere.io/kubesphere/pkg/apiserver/authentication" + "kubesphere.io/kubesphere/pkg/apiserver/dispatch" + "kubesphere.io/kubesphere/pkg/apiserver/filters" + "kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/informers" + devopsv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/devops/v1alpha2" iamv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/iam/v1alpha2" loggingv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/logging/v1alpha2" monitoringv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha2" @@ -18,14 +27,15 @@ import ( resourcesv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha2" resourcev1alpha3 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha3" servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/servicemesh/metrics/v1alpha2" + tenantv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/tenant/v1alpha2" terminalv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/terminal/v1alpha2" "kubesphere.io/kubesphere/pkg/simple/client/cache" - "kubesphere.io/kubesphere/pkg/simple/client/db" "kubesphere.io/kubesphere/pkg/simple/client/devops" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/logging" "kubesphere.io/kubesphere/pkg/simple/client/monitoring" + "kubesphere.io/kubesphere/pkg/simple/client/mysql" "kubesphere.io/kubesphere/pkg/simple/client/openpitrix" "kubesphere.io/kubesphere/pkg/simple/client/s3" "net" @@ -85,7 +95,7 @@ type APIServer struct { S3Client s3.Interface // - DBClient db.Interface + DBClient *mysql.Client // LdapClient ldap.Interface @@ -104,26 +114,31 @@ func (s *APIServer) PrepareRun() error { s.installKubeSphereAPIs() + for _, ws := range s.container.RegisteredWebServices() { + klog.V(2).Infof("%s", ws.RootPath()) + } + + s.Server.Handler = s.container + + s.buildHandlerChain() + return nil } func (s *APIServer) installKubeSphereAPIs() { urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory)) - urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.container)) - // Need to refactor devops api registration, too much dependencies - //urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.DevopsClient)) - + urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.DevopsClient, s.DBClient.Database(), nil, s.KubernetesClient.KubeSphere(), s.InformerFactory.KubeSphereSharedInformerFactory(), s.S3Client)) urlruntime.Must(loggingv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.LoggingClient)) urlruntime.Must(monitoringv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.MonitoringClient)) urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient)) urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes())) urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory)) - //urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.DBClient)) + urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.DBClient.Database())) urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config())) urlruntime.Must(iamv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.LdapClient, s.CacheClient, s.AuthenticateOptions)) - + urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.container)) } func (s *APIServer) Run(stopCh <-chan struct{}) error { @@ -141,6 +156,7 @@ func (s *APIServer) Run(stopCh <-chan struct{}) error { _ = s.Server.Shutdown(ctx) }() + klog.V(0).Infof("Start listening on %s", s.Server.Addr) if s.Server.TLSConfig != nil { return s.Server.ListenAndServeTLS("", "") } else { @@ -148,6 +164,27 @@ func (s *APIServer) Run(stopCh <-chan struct{}) error { } } +func (s *APIServer) buildHandlerChain() { + requestInfoResolver := &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"), + GrouplessAPIPrefixes: sets.NewString("api", "kapi"), + } + + failed := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + }) + + handler := s.Server.Handler + handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{}) + handler = filters.WithMultipleClusterDispatcher(handler, dispatch.DefaultClusterDispatch) + handler = filters.WithAuthorization(handler, authorizerfactory.NewAlwaysAllowAuthorizer()) + + handler = filters.WithAuthentication(handler, bearertoken.New(authentication.NewTokenAuthenticator(s.CacheClient)), failed) + handler = filters.WithRequestInfo(handler, requestInfoResolver) + + s.Server.Handler = handler +} + func (s *APIServer) waitForResourceSync(stopCh <-chan struct{}) error { klog.V(0).Info("Start cache objects") @@ -338,3 +375,10 @@ func getRequestIP(req *restful.Request) string { return address } + +type errorResponder struct{} + +func (e *errorResponder) Error(w http.ResponseWriter, req *http.Request, err error) { + klog.Error(err) + responsewriters.InternalError(w, req, err) +} diff --git a/pkg/apiserver/authentication/token_authenticator.go b/pkg/apiserver/authentication/token_authenticator.go new file mode 100644 index 00000000..dbad2ce9 --- /dev/null +++ b/pkg/apiserver/authentication/token_authenticator.go @@ -0,0 +1,29 @@ +package authentication + +import ( + "context" + "k8s.io/apiserver/pkg/authentication/authenticator" + "k8s.io/apiserver/pkg/authentication/user" + "kubesphere.io/kubesphere/pkg/simple/client/cache" +) + +type TokenAuthenticator struct { + cacheClient cache.Interface +} + +func NewTokenAuthenticator(cacheClient cache.Interface) authenticator.Token { + return &TokenAuthenticator{ + cacheClient: cacheClient, + } +} + +func (t *TokenAuthenticator) AuthenticateToken(ctx context.Context, token string) (*authenticator.Response, bool, error) { + return &authenticator.Response{ + User: &user.DefaultInfo{ + Name: "admin", + UID: "", + Groups: nil, + Extra: nil, + }, + }, true, nil +} diff --git a/pkg/apiserver/dispatch/dispatch.go b/pkg/apiserver/dispatch/dispatch.go new file mode 100644 index 00000000..8cf2b6ba --- /dev/null +++ b/pkg/apiserver/dispatch/dispatch.go @@ -0,0 +1,36 @@ +package dispatch + +import ( + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "net/http" + + "k8s.io/apimachinery/pkg/util/proxy" +) + +// Dispatcher defines how to forward request to desired cluster apiserver +type Dispatcher interface { + Dispatch(w http.ResponseWriter, req *http.Request) +} + +var DefaultClusterDispatch = newClusterDispatch() + +type clusterDispatch struct { + transport *http.Transport +} + +func newClusterDispatch() Dispatcher { + return &clusterDispatch{} +} + +func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request) { + + u := *req.URL + // u.Host = someHost + + httpProxy := proxy.NewUpgradeAwareHandler(&u, c.transport, false, false, c) + httpProxy.ServeHTTP(w, req) +} + +func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) { + responsewriters.InternalError(w, req, err) +} diff --git a/pkg/apiserver/filters/authentication.go b/pkg/apiserver/filters/authentication.go new file mode 100644 index 00000000..7fce20bf --- /dev/null +++ b/pkg/apiserver/filters/authentication.go @@ -0,0 +1,33 @@ +package filters + +import ( + "k8s.io/apiserver/pkg/authentication/authenticator" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog" + "net/http" +) + +func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler) http.Handler { + if auth == nil { + klog.Warningf("Authentication is disabled") + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + //authenticationStart := time.Now() + + resp, ok, err := auth.AuthenticateRequest(req) + if err != nil || !ok { + if err != nil { + klog.Errorf("Unable to authenticate the request due to error: %v", err) + } + failed.ServeHTTP(w, req) + return + } + + // authorization header is not required anymore in case of a successful authentication. + req.Header.Del("Authorization") + + req = req.WithContext(request.WithUser(req.Context(), resp.User)) + handler.ServeHTTP(w, req) + }) +} diff --git a/pkg/apiserver/filters/authorization.go b/pkg/apiserver/filters/authorization.go new file mode 100644 index 00000000..d76d79da --- /dev/null +++ b/pkg/apiserver/filters/authorization.go @@ -0,0 +1,71 @@ +package filters + +import ( + "context" + "errors" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + k8srequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/apiserver/request" + "net/http" +) + +// WithAuthorization passes all authorized requests on to handler, and returns forbidden error otherwise. +func WithAuthorization(handler http.Handler, a authorizer.Authorizer) http.Handler { + if a == nil { + klog.Warningf("Authorization is disabled") + return handler + } + + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + + attributes, err := GetAuthorizerAttributes(ctx) + if err != nil { + responsewriters.InternalError(w, req, err) + } + + authorized, reason, err := a.Authorize(attributes) + if authorized == authorizer.DecisionAllow { + handler.ServeHTTP(w, req) + return + } + + if err != nil { + responsewriters.InternalError(w, req, err) + return + } + + klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason) + w.WriteHeader(http.StatusForbidden) + }) +} + +func GetAuthorizerAttributes(ctx context.Context) (authorizer.Attributes, error) { + attribs := authorizer.AttributesRecord{} + + user, ok := k8srequest.UserFrom(ctx) + if ok { + attribs.User = user + } + + requestInfo, found := request.RequestInfoFrom(ctx) + if !found { + return nil, errors.New("no RequestInfo found in the context") + } + + // Start with common attributes that apply to resource and non-resource requests + attribs.ResourceRequest = requestInfo.IsResourceRequest + attribs.Path = requestInfo.Path + attribs.Verb = requestInfo.Verb + + attribs.APIGroup = requestInfo.APIGroup + attribs.APIVersion = requestInfo.APIVersion + attribs.Resource = requestInfo.Resource + attribs.Subresource = requestInfo.Subresource + attribs.Namespace = requestInfo.Namespace + attribs.Name = requestInfo.Name + + return &attribs, nil +} diff --git a/pkg/apiserver/filters/dispatch.go b/pkg/apiserver/filters/dispatch.go new file mode 100644 index 00000000..c893e6da --- /dev/null +++ b/pkg/apiserver/filters/dispatch.go @@ -0,0 +1,32 @@ +package filters + +import ( + "fmt" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/apiserver/dispatch" + "kubesphere.io/kubesphere/pkg/apiserver/request" + "net/http" +) + +// Multiple cluster dispatcher forward request to desired cluster based on request cluster name +// which included in request path clusters/{cluster} +func WithMultipleClusterDispatcher(handler http.Handler, dispatch dispatch.Dispatcher) http.Handler { + if dispatch == nil { + klog.V(4).Infof("Multiple cluster dispatcher is disabled") + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + responsewriters.InternalError(w, req, fmt.Errorf("")) + return + } + + if info.Cluster == "" { + handler.ServeHTTP(w, req) + } else { + dispatch.Dispatch(w, req) + } + }) +} diff --git a/pkg/apiserver/filters/kubeapiserver.go b/pkg/apiserver/filters/kubeapiserver.go new file mode 100644 index 00000000..c7eb02cf --- /dev/null +++ b/pkg/apiserver/filters/kubeapiserver.go @@ -0,0 +1,44 @@ +package filters + +import ( + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/client-go/rest" + "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/apiserver/request" + "kubesphere.io/kubesphere/pkg/server/errors" + "net/http" + "net/url" + + "k8s.io/apimachinery/pkg/util/proxy" +) + +// WithKubeAPIServer proxy request to kubernetes service if requests path starts with /api +func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler { + kubernetes, _ := url.Parse(config.Host) + defaultTransport, err := rest.TransportFor(config) + if err != nil { + klog.Errorf("Unable to create transport from rest.Config: %v", err) + return handler + } + + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + err := errors.New("Unable to retrieve request info from request") + klog.Error(err) + responsewriters.InternalError(w, req, err) + } + + if info.IsKubernetesRequest { + s := *req.URL + s.Host = kubernetes.Host + s.Scheme = kubernetes.Scheme + + httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed) + httpProxy.ServeHTTP(w, req) + return + } + + handler.ServeHTTP(w, req) + }) +} diff --git a/pkg/apiserver/filters/requestinfo.go b/pkg/apiserver/filters/requestinfo.go new file mode 100644 index 00000000..c1d605b3 --- /dev/null +++ b/pkg/apiserver/filters/requestinfo.go @@ -0,0 +1,22 @@ +package filters + +import ( + "fmt" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "kubesphere.io/kubesphere/pkg/apiserver/request" + "net/http" +) + +func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + info, err := resolver.NewRequestInfo(req) + if err != nil { + responsewriters.InternalError(w, req, fmt.Errorf("failed to crate RequestInfo: %v", err)) + return + } + + req = req.WithContext(request.WithRequestInfo(ctx, info)) + handler.ServeHTTP(w, req) + }) +} diff --git a/pkg/apiserver/request/requestinfo.go b/pkg/apiserver/request/requestinfo.go new file mode 100644 index 00000000..4a85ff99 --- /dev/null +++ b/pkg/apiserver/request/requestinfo.go @@ -0,0 +1,173 @@ +package request + +import ( + "context" + "fmt" + "k8s.io/apimachinery/pkg/util/sets" + "net/http" + "strings" + + k8srequest "k8s.io/apiserver/pkg/endpoints/request" +) + +type RequestInfoResolver interface { + NewRequestInfo(req *http.Request) (*RequestInfo, error) +} + +// specialVerbs contains just strings which are used in REST paths for special actions that don't fall under the normal +// CRUDdy GET/POST/PUT/DELETE actions on REST objects. +// master's Mux. +var specialVerbs = sets.NewString("proxy", "watch") + +var kubernetesAPIPrefixes = sets.NewString("api", "apis") + +// RequestInfo holds information parsed from the http.Request, +// extended from k8s.io/apiserver/pkg/endpoints/request/requestinfo.go +type RequestInfo struct { + *k8srequest.RequestInfo + + // IsKubeSphereRequest indicates whether or not the request should be handled by kubernetes or kubesphere + IsKubernetesRequest bool + + // Workspace of requested namespace, for non-workspaced resources, this may be empty + Workspace string + + // Cluster of requested resource, this is empty in single-cluster environment + Cluster string +} + +type RequestInfoFactory struct { + APIPrefixes sets.String + GrouplessAPIPrefixes sets.String + k8sRequestInfoFactory *k8srequest.RequestInfoFactory +} + +// NewRequestInfo returns the information from the http request. If error is not nil, RequestInfo holds the information as best it is known before the failure +// It handles both resource and non-resource requests and fills in all the pertinent information for each. +// Valid Inputs: +// +// /apis/{api-group}/{version}/namespaces +// /api/{version}/namespaces +// /api/{version}/namespaces/{namespace} +// /api/{version}/namespaces/{namespace}/{resource} +// /api/{version}/namespaces/{namespace}/{resource}/{resourceName} +// /api/{version}/{resource} +// /api/{version}/{resource}/{resourceName} +// +// Special verbs without subresources: +// /api/{version}/proxy/{resource}/{resourceName} +// /api/{version}/proxy/namespaces/{namespace}/{resource}/{resourceName} +// +// Special verbs with subresources: +// /api/{version}/watch/{resource} +// /api/{version}/watch/namespaces/{namespace}/{resource} +// +// /kapis/{api-group}/{version}/workspaces/{workspace}/{resource}/{resourceName} +// / +// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource} +// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName} +// With workspaces: +// /kapis/{api-group}/{version}/clusters/{cluster}/namespaces/{namespace}/{resource} +// /kapis/{api-group}/{version}/clusters/{cluster}/namespaces/{namespace}/{resource}/{resourceName} +// +func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) { + + requestInfo := RequestInfo{ + IsKubernetesRequest: false, + RequestInfo: &k8srequest.RequestInfo{ + Path: req.URL.Path, + Verb: req.Method, + }, + } + + defer func() { + if kubernetesAPIPrefixes.Has(requestInfo.APIPrefix) { + requestInfo.IsKubernetesRequest = true + } + }() + + currentParts := splitPath(req.URL.Path) + if len(currentParts) < 3 { + return &requestInfo, nil + } + + if !r.APIPrefixes.Has(currentParts[0]) { + // return a non-resource request + return &requestInfo, nil + } + requestInfo.APIPrefix = currentParts[0] + currentParts = currentParts[1:] + + if !r.GrouplessAPIPrefixes.Has(requestInfo.APIPrefix) { + if len(currentParts) < 2 { + return &requestInfo, nil + } + + if currentParts[0] == "clusters" { + requestInfo.Cluster = currentParts[1] + currentParts = currentParts[2:] + } + + if len(currentParts) < 3 { + return &requestInfo, nil + } + + requestInfo.APIGroup = currentParts[0] + currentParts = currentParts[1:] + } + + requestInfo.IsResourceRequest = true + requestInfo.APIVersion = currentParts[0] + currentParts = currentParts[1:] + + if specialVerbs.Has(currentParts[0]) { + if len(currentParts) < 2 { + return &requestInfo, fmt.Errorf("unable to determine kind and namespace from url: %v", req.URL) + } + + requestInfo.Verb = currentParts[0] + currentParts = currentParts[1:] + } else { + switch req.Method { + case "POST": + requestInfo.Verb = "create" + case "GET", "HEAD": + requestInfo.Verb = "get" + case "PUT": + requestInfo.Verb = "update" + case "PATCH": + requestInfo.Verb = "patch" + case "DELETE": + requestInfo.Verb = "delete" + default: + requestInfo.Verb = "" + } + } + + return &requestInfo, nil +} + +type requestInfoKeyType int + +// requestInfoKey is the RequestInfo key for the context. It's of private type here. Because +// keys are interfaces and interfaces are equal when the type and the value is equal, this +// does not conflict with the keys defined in pkg/api. +const requestInfoKey requestInfoKeyType = iota + +func WithRequestInfo(parent context.Context, info *RequestInfo) context.Context { + return k8srequest.WithValue(parent, requestInfoKey, info) +} + +func RequestInfoFrom(ctx context.Context) (*RequestInfo, bool) { + info, ok := ctx.Value(requestInfoKey).(*RequestInfo) + return info, ok +} + +// splitPath returns the segments for a URL path. +func splitPath(path string) []string { + path = strings.Trim(path, "/") + if path == "" { + return []string{} + } + return strings.Split(path, "/") +} diff --git a/pkg/apiserver/server/handler.go b/pkg/apiserver/server/handler.go new file mode 100644 index 00000000..abb4e431 --- /dev/null +++ b/pkg/apiserver/server/handler.go @@ -0,0 +1 @@ +package server diff --git a/pkg/apiserver/servicemesh/metrics/handlers.go b/pkg/apiserver/servicemesh/metrics/handlers.go deleted file mode 100644 index 249c22d8..00000000 --- a/pkg/apiserver/servicemesh/metrics/handlers.go +++ /dev/null @@ -1,70 +0,0 @@ -package metrics - -import ( - "fmt" - "github.com/emicklei/go-restful" - "github.com/kiali/kiali/handlers" -) - -// Get app metrics -func GetAppMetrics(request *restful.Request, response *restful.Response) { - handlers.AppMetrics(request, response) -} - -// Get workload metrics -func GetWorkloadMetrics(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - workload := request.PathParameter("workload") - - if len(namespace) > 0 && len(workload) > 0 { - request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s&workload=%s", request.Request.URL.RawQuery, namespace, workload) - } - - handlers.WorkloadMetrics(request, response) -} - -// Get service metrics -func GetServiceMetrics(request *restful.Request, response *restful.Response) { - handlers.ServiceMetrics(request, response) -} - -// Get namespace metrics -func GetNamespaceMetrics(request *restful.Request, response *restful.Response) { - handlers.NamespaceMetrics(request, response) -} - -// Get service graph for namespace -func GetNamespaceGraph(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - - if len(namespace) > 0 { - request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s", request.Request.URL.RawQuery, namespace) - } - - handlers.GetNamespaceGraph(request, response) -} - -// Get service graph for namespaces -func GetNamespacesGraph(request *restful.Request, response *restful.Response) { - handlers.GraphNamespaces(request, response) -} - -// Get namespace health -func GetNamespaceHealth(request *restful.Request, response *restful.Response) { - handlers.NamespaceHealth(request, response) -} - -// Get workload health -func GetWorkloadHealth(request *restful.Request, response *restful.Response) { - handlers.WorkloadHealth(request, response) -} - -// Get app health -func GetAppHealth(request *restful.Request, response *restful.Response) { - handlers.AppHealth(request, response) -} - -// Get service health -func GetServiceHealth(request *restful.Request, response *restful.Response) { - handlers.ServiceHealth(request, response) -} diff --git a/pkg/apiserver/servicemesh/tracing/handlers.go b/pkg/apiserver/servicemesh/tracing/handlers.go deleted file mode 100644 index be6adaaa..00000000 --- a/pkg/apiserver/servicemesh/tracing/handlers.go +++ /dev/null @@ -1,45 +0,0 @@ -package tracing - -import ( - "fmt" - "github.com/emicklei/go-restful" - "io/ioutil" - "log" - "net/http" -) - -var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686/jaeger" - -func GetServiceTracing(request *restful.Request, response *restful.Response) { - namespace := request.PathParameter("namespace") - service := request.PathParameter("service") - - serviceName := fmt.Sprintf("%s.%s", service, namespace) - - url := fmt.Sprintf("%s/api/traces?%s&service=%s", JaegerQueryUrl, request.Request.URL.RawQuery, serviceName) - - resp, err := http.Get(url) - - if err != nil { - log.Printf("query jaeger faile with err %v", err) - response.WriteError(http.StatusInternalServerError, err) - return - } - - body, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - - if err != nil { - log.Printf("read response error : %v", err) - response.WriteError(http.StatusInternalServerError, err) - return - } - - // need to set header for proper response - response.Header().Set("Content-Type", "application/json") - _, err = response.Write(body) - - if err != nil { - log.Printf("write response failed %v", err) - } -} diff --git a/pkg/kapis/resources/v1alpha3/register.go b/pkg/kapis/resources/v1alpha3/register.go index 9cb06d10..7d80e879 100644 --- a/pkg/kapis/resources/v1alpha3/register.go +++ b/pkg/kapis/resources/v1alpha3/register.go @@ -38,7 +38,7 @@ const ( ok = "OK" ) -var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"} +var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha3"} func AddToContainer(c *restful.Container, informerFactory informers.InformerFactory) error { diff --git a/pkg/kapis/servicemesh/metrics/v1alpha2/handler.go b/pkg/kapis/servicemesh/metrics/v1alpha2/handler.go index aaaf31cd..678dac8c 100644 --- a/pkg/kapis/servicemesh/metrics/v1alpha2/handler.go +++ b/pkg/kapis/servicemesh/metrics/v1alpha2/handler.go @@ -1 +1,110 @@ package v1alpha2 + +import ( + "fmt" + "github.com/emicklei/go-restful" + "github.com/kiali/kiali/handlers" + "io/ioutil" + "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/api" + "net/http" +) + +var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686/jaeger" + +// Get app metrics +func getAppMetrics(request *restful.Request, response *restful.Response) { + handlers.AppMetrics(request, response) +} + +// Get workload metrics +func getWorkloadMetrics(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + workload := request.PathParameter("workload") + + if len(namespace) > 0 && len(workload) > 0 { + request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s&workload=%s", request.Request.URL.RawQuery, namespace, workload) + } + + handlers.WorkloadMetrics(request, response) +} + +// Get service metrics +func getServiceMetrics(request *restful.Request, response *restful.Response) { + handlers.ServiceMetrics(request, response) +} + +// Get namespace metrics +func getNamespaceMetrics(request *restful.Request, response *restful.Response) { + handlers.NamespaceMetrics(request, response) +} + +// Get service graph for namespace +func getNamespaceGraph(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + + if len(namespace) > 0 { + request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s", request.Request.URL.RawQuery, namespace) + } + + handlers.GetNamespaceGraph(request, response) +} + +// Get service graph for namespaces +func getNamespacesGraph(request *restful.Request, response *restful.Response) { + handlers.GraphNamespaces(request, response) +} + +// Get namespace health +func getNamespaceHealth(request *restful.Request, response *restful.Response) { + handlers.NamespaceHealth(request, response) +} + +// Get workload health +func getWorkloadHealth(request *restful.Request, response *restful.Response) { + handlers.WorkloadHealth(request, response) +} + +// Get app health +func getAppHealth(request *restful.Request, response *restful.Response) { + handlers.AppHealth(request, response) +} + +// Get service health +func getServiceHealth(request *restful.Request, response *restful.Response) { + handlers.ServiceHealth(request, response) +} + +func getServiceTracing(request *restful.Request, response *restful.Response) { + namespace := request.PathParameter("namespace") + service := request.PathParameter("service") + + serviceName := fmt.Sprintf("%s.%s", service, namespace) + + url := fmt.Sprintf("%s/api/traces?%s&service=%s", JaegerQueryUrl, request.Request.URL.RawQuery, serviceName) + + resp, err := http.Get(url) + + if err != nil { + klog.Errorf("query jaeger faile with err %v", err) + api.HandleInternalError(response, err) + return + } + + body, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + + if err != nil { + klog.Errorf("read response error : %v", err) + api.HandleInternalError(response, err) + return + } + + // need to set header for proper response + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + + if err != nil { + klog.Errorf("write response failed %v", err) + } +} diff --git a/pkg/kapis/servicemesh/metrics/v1alpha2/register.go b/pkg/kapis/servicemesh/metrics/v1alpha2/register.go index 40d9b7dc..b20a7af5 100644 --- a/pkg/kapis/servicemesh/metrics/v1alpha2/register.go +++ b/pkg/kapis/servicemesh/metrics/v1alpha2/register.go @@ -5,8 +5,6 @@ import ( "github.com/emicklei/go-restful-openapi" "k8s.io/apimachinery/pkg/runtime/schema" "kubesphere.io/kubesphere/pkg/apiserver/runtime" - "kubesphere.io/kubesphere/pkg/apiserver/servicemesh/metrics" - "kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing" "net/http" ) @@ -14,12 +12,7 @@ const GroupName = "servicemesh.kubesphere.io" var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"} -var ( - WebServiceBuilder = runtime.NewContainerBuilder(addWebService) - AddToContainer = WebServiceBuilder.AddToContainer -) - -func addWebService(c *restful.Container) error { +func AddToContainer(c *restful.Container) error { tags := []string{"ServiceMesh"} @@ -28,7 +21,7 @@ func addWebService(c *restful.Container) error { // Get service metrics // GET /namespaces/{namespace}/services/{service}/metrics webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/metrics"). - To(metrics.GetServiceMetrics). + To(getServiceMetrics). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get service metrics from a specific namespace"). Param(webservice.PathParameter("namespace", "name of the namespace")). @@ -49,7 +42,7 @@ func addWebService(c *restful.Container) error { // Get app metrics // Get /namespaces/{namespace}/apps/{app}/metrics webservice.Route(webservice.GET("/namespaces/{namespace}/apps/{app}/metrics"). - To(metrics.GetAppMetrics). + To(getAppMetrics). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get app metrics from a specific namespace"). Param(webservice.PathParameter("namespace", "name of the namespace")). @@ -71,7 +64,7 @@ func addWebService(c *restful.Container) error { // Get workload metrics // Get /namespaces/{namespace}/workloads/{workload}/metrics webservice.Route(webservice.GET("/namespaces/{namespace}/workloads/{workload}/metrics"). - To(metrics.GetWorkloadMetrics). + To(getWorkloadMetrics). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get workload metrics from a specific namespace"). Param(webservice.PathParameter("namespace", "name of the namespace").Required(true)). @@ -94,7 +87,7 @@ func addWebService(c *restful.Container) error { // Get namespace metrics // Get /namespaces/{namespace}/metrics webservice.Route(webservice.GET("/namespaces/{namespace}/metrics"). - To(metrics.GetNamespaceMetrics). + To(getNamespaceMetrics). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get metrics from a specific namespace"). Param(webservice.PathParameter("namespace", "name of the namespace").Required(true)). @@ -115,7 +108,7 @@ func addWebService(c *restful.Container) error { // Get namespace graph // Get /namespaces/{namespace}/graph webservice.Route(webservice.GET("/namespaces/{namespace}/graph"). - To(metrics.GetNamespaceGraph). + To(getNamespaceGraph). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get service graph for a specific namespace"). Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)). @@ -132,7 +125,7 @@ func addWebService(c *restful.Container) error { // Get namespaces graph, for multiple namespaces // Get /namespaces/graph webservice.Route(webservice.GET("/namespaces/graph"). - To(metrics.GetNamespacesGraph). + To(getNamespacesGraph). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get graph from all namespaces"). Param(webservice.QueryParameter("duration", "duration of the query period, in seconds").DefaultValue("10m")). @@ -147,7 +140,7 @@ func addWebService(c *restful.Container) error { // Get namespace health webservice.Route(webservice.GET("/namespaces/{namespace}/health"). - To(metrics.GetNamespaceHealth). + To(getNamespaceHealth). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get app/service/workload health of a namespace"). Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)). @@ -161,7 +154,7 @@ func addWebService(c *restful.Container) error { // Get workloads health webservice.Route(webservice.GET("/namespaces/{namespace}/workloads/{workload}/health"). - To(metrics.GetWorkloadHealth). + To(getWorkloadHealth). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get workload health"). Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)). @@ -173,7 +166,7 @@ func addWebService(c *restful.Container) error { // Get app health webservice.Route(webservice.GET("/namespaces/{namespace}/apps/{app}/health"). - To(metrics.GetAppHealth). + To(getAppHealth). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get app health"). Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)). @@ -185,7 +178,7 @@ func addWebService(c *restful.Container) error { // Get service health webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/health"). - To(metrics.GetServiceHealth). + To(getServiceHealth). Metadata(restfulspec.KeyOpenAPITags, tags). Doc("Get service health"). Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)). @@ -197,7 +190,7 @@ func addWebService(c *restful.Container) error { // Get service tracing webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/traces"). - To(tracing.GetServiceTracing). + To(getServiceTracing). Doc("Get tracing of a service, should have servicemesh enabled first"). Metadata(restfulspec.KeyOpenAPITags, tags). Param(webservice.PathParameter("namespace", "namespace of service").Required(true)). diff --git a/pkg/models/resources/v1alpha2/resource/resources.go b/pkg/models/resources/v1alpha2/resource/resources.go index de91c2d2..bdd9c6e8 100644 --- a/pkg/models/resources/v1alpha2/resource/resources.go +++ b/pkg/models/resources/v1alpha2/resource/resources.go @@ -53,12 +53,16 @@ type ResourceGetter struct { } func (r ResourceGetter) Add(resource string, getter v1alpha2.Interface) { + if r.resourcesGetters == nil { + r.resourcesGetters = make(map[string]v1alpha2.Interface) + } r.resourcesGetters[resource] = getter } func NewResourceGetter(factory informers.InformerFactory) *ResourceGetter { resourceGetters := make(map[string]v1alpha2.Interface) + //resourceGetters[v1alpha2.Deployments] = deployments resourceGetters[v1alpha2.ConfigMaps] = configmap.NewConfigmapSearcher(factory.KubernetesSharedInformerFactory()) resourceGetters[v1alpha2.CronJobs] = cronjob.NewCronJobSearcher(factory.KubernetesSharedInformerFactory()) resourceGetters[v1alpha2.DaemonSets] = daemonset.NewDaemonSetSearcher(factory.KubernetesSharedInformerFactory()) @@ -135,7 +139,7 @@ func (r *ResourceGetter) ListResources(namespace, resource string, conditions *p limit = len(result) - offset } - result = result[offset : offset+limit] + items = result[offset : offset+limit] return &models.PageableResponse{TotalCount: len(result), Items: items}, nil } diff --git a/pkg/models/routers/routers.go b/pkg/models/routers/routers.go index f4c1e00b..20ba1520 100644 --- a/pkg/models/routers/routers.go +++ b/pkg/models/routers/routers.go @@ -66,7 +66,7 @@ func NewRouterOperator(client kubernetes.Interface, informers informers.SharedIn routerTemplates := make(map[string]runtime.Object, 2) if err != nil { - klog.Fatalf("error happened during loading external yamls, %v", err) + klog.Errorf("error happened during loading external yamls, %v", err) } for _, f := range yamls { diff --git a/pkg/server/filter/logging.go b/pkg/server/filter/logging.go deleted file mode 100644 index 150c0e04..00000000 --- a/pkg/server/filter/logging.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - - Copyright 2019 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ - -package filter - -import ( - "k8s.io/klog" - "net" - "strings" - "time" - - "github.com/emicklei/go-restful" -) - -func Logging(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) { - start := time.Now() - chain.ProcessFilter(req, resp) - klog.V(4).Infof("%s - \"%s %s %s\" %d %d %dms", - getRequestIP(req), - req.Request.Method, - req.Request.RequestURI, - req.Request.Proto, - resp.StatusCode(), - resp.ContentLength(), - time.Since(start)/time.Millisecond, - ) -} - -func getRequestIP(req *restful.Request) string { - address := strings.Trim(req.Request.Header.Get("X-Real-Ip"), " ") - if address != "" { - return address - } - - address = strings.Trim(req.Request.Header.Get("X-Forwarded-For"), " ") - if address != "" { - return address - } - - address, _, err := net.SplitHostPort(req.Request.RemoteAddr) - if err != nil { - return req.Request.RemoteAddr - } - - return address -} diff --git a/pkg/server/server.go b/pkg/server/server.go deleted file mode 100644 index d6a1aaea..00000000 --- a/pkg/server/server.go +++ /dev/null @@ -1,24 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "k8s.io/klog" - "net/http" - "runtime" -) - -func LogStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) { - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason)) - for i := 2; ; i += 1 { - _, file, line, ok := runtime.Caller(i) - if !ok { - break - } - buffer.WriteString(fmt.Sprintf(" %s:%d\r\n", file, line)) - } - klog.Error(buffer.String()) - httpWriter.WriteHeader(http.StatusInternalServerError) - httpWriter.Write([]byte("recover from panic situation")) -} diff --git a/vendor/github.com/mxk/go-flowrate/LICENSE b/vendor/github.com/mxk/go-flowrate/LICENSE new file mode 100644 index 00000000..e9f9f628 --- /dev/null +++ b/vendor/github.com/mxk/go-flowrate/LICENSE @@ -0,0 +1,29 @@ +Copyright (c) 2014 The Go-FlowRate Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the + distribution. + + * Neither the name of the go-flowrate project nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/mxk/go-flowrate/flowrate/flowrate.go b/vendor/github.com/mxk/go-flowrate/flowrate/flowrate.go new file mode 100644 index 00000000..1b727721 --- /dev/null +++ b/vendor/github.com/mxk/go-flowrate/flowrate/flowrate.go @@ -0,0 +1,267 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +// Package flowrate provides the tools for monitoring and limiting the flow rate +// of an arbitrary data stream. +package flowrate + +import ( + "math" + "sync" + "time" +) + +// Monitor monitors and limits the transfer rate of a data stream. +type Monitor struct { + mu sync.Mutex // Mutex guarding access to all internal fields + active bool // Flag indicating an active transfer + start time.Duration // Transfer start time (clock() value) + bytes int64 // Total number of bytes transferred + samples int64 // Total number of samples taken + + rSample float64 // Most recent transfer rate sample (bytes per second) + rEMA float64 // Exponential moving average of rSample + rPeak float64 // Peak transfer rate (max of all rSamples) + rWindow float64 // rEMA window (seconds) + + sBytes int64 // Number of bytes transferred since sLast + sLast time.Duration // Most recent sample time (stop time when inactive) + sRate time.Duration // Sampling rate + + tBytes int64 // Number of bytes expected in the current transfer + tLast time.Duration // Time of the most recent transfer of at least 1 byte +} + +// New creates a new flow control monitor. Instantaneous transfer rate is +// measured and updated for each sampleRate interval. windowSize determines the +// weight of each sample in the exponential moving average (EMA) calculation. +// The exact formulas are: +// +// sampleTime = currentTime - prevSampleTime +// sampleRate = byteCount / sampleTime +// weight = 1 - exp(-sampleTime/windowSize) +// newRate = weight*sampleRate + (1-weight)*oldRate +// +// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, +// respectively. +func New(sampleRate, windowSize time.Duration) *Monitor { + if sampleRate = clockRound(sampleRate); sampleRate <= 0 { + sampleRate = 5 * clockRate + } + if windowSize <= 0 { + windowSize = 1 * time.Second + } + now := clock() + return &Monitor{ + active: true, + start: now, + rWindow: windowSize.Seconds(), + sLast: now, + sRate: sampleRate, + tLast: now, + } +} + +// Update records the transfer of n bytes and returns n. It should be called +// after each Read/Write operation, even if n is 0. +func (m *Monitor) Update(n int) int { + m.mu.Lock() + m.update(n) + m.mu.Unlock() + return n +} + +// IO is a convenience method intended to wrap io.Reader and io.Writer method +// execution. It calls m.Update(n) and then returns (n, err) unmodified. +func (m *Monitor) IO(n int, err error) (int, error) { + return m.Update(n), err +} + +// Done marks the transfer as finished and prevents any further updates or +// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and +// Limit methods become NOOPs. It returns the total number of bytes transferred. +func (m *Monitor) Done() int64 { + m.mu.Lock() + if now := m.update(0); m.sBytes > 0 { + m.reset(now) + } + m.active = false + m.tLast = 0 + n := m.bytes + m.mu.Unlock() + return n +} + +// timeRemLimit is the maximum Status.TimeRem value. +const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second + +// Status represents the current Monitor status. All transfer rates are in bytes +// per second rounded to the nearest byte. +type Status struct { + Active bool // Flag indicating an active transfer + Start time.Time // Transfer start time + Duration time.Duration // Time period covered by the statistics + Idle time.Duration // Time since the last transfer of at least 1 byte + Bytes int64 // Total number of bytes transferred + Samples int64 // Total number of samples taken + InstRate int64 // Instantaneous transfer rate + CurRate int64 // Current transfer rate (EMA of InstRate) + AvgRate int64 // Average transfer rate (Bytes / Duration) + PeakRate int64 // Maximum instantaneous transfer rate + BytesRem int64 // Number of bytes remaining in the transfer + TimeRem time.Duration // Estimated time to completion + Progress Percent // Overall transfer progress +} + +// Status returns current transfer status information. The returned value +// becomes static after a call to Done. +func (m *Monitor) Status() Status { + m.mu.Lock() + now := m.update(0) + s := Status{ + Active: m.active, + Start: clockToTime(m.start), + Duration: m.sLast - m.start, + Idle: now - m.tLast, + Bytes: m.bytes, + Samples: m.samples, + PeakRate: round(m.rPeak), + BytesRem: m.tBytes - m.bytes, + Progress: percentOf(float64(m.bytes), float64(m.tBytes)), + } + if s.BytesRem < 0 { + s.BytesRem = 0 + } + if s.Duration > 0 { + rAvg := float64(s.Bytes) / s.Duration.Seconds() + s.AvgRate = round(rAvg) + if s.Active { + s.InstRate = round(m.rSample) + s.CurRate = round(m.rEMA) + if s.BytesRem > 0 { + if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 { + ns := float64(s.BytesRem) / tRate * 1e9 + if ns > float64(timeRemLimit) { + ns = float64(timeRemLimit) + } + s.TimeRem = clockRound(time.Duration(ns)) + } + } + } + } + m.mu.Unlock() + return s +} + +// Limit restricts the instantaneous (per-sample) data flow to rate bytes per +// second. It returns the maximum number of bytes (0 <= n <= want) that may be +// transferred immediately without exceeding the limit. If block == true, the +// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, +// or the transfer is inactive (after a call to Done). +// +// At least one byte is always allowed to be transferred in any given sampling +// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate +// is 10 bytes per second. +// +// For usage examples, see the implementation of Reader and Writer in io.go. +func (m *Monitor) Limit(want int, rate int64, block bool) (n int) { + if want < 1 || rate < 1 { + return want + } + m.mu.Lock() + + // Determine the maximum number of bytes that can be sent in one sample + limit := round(float64(rate) * m.sRate.Seconds()) + if limit <= 0 { + limit = 1 + } + + // If block == true, wait until m.sBytes < limit + if now := m.update(0); block { + for m.sBytes >= limit && m.active { + now = m.waitNextSample(now) + } + } + + // Make limit <= want (unlimited if the transfer is no longer active) + if limit -= m.sBytes; limit > int64(want) || !m.active { + limit = int64(want) + } + m.mu.Unlock() + + if limit < 0 { + limit = 0 + } + return int(limit) +} + +// SetTransferSize specifies the total size of the data transfer, which allows +// the Monitor to calculate the overall progress and time to completion. +func (m *Monitor) SetTransferSize(bytes int64) { + if bytes < 0 { + bytes = 0 + } + m.mu.Lock() + m.tBytes = bytes + m.mu.Unlock() +} + +// update accumulates the transferred byte count for the current sample until +// clock() - m.sLast >= m.sRate. The monitor status is updated once the current +// sample is done. +func (m *Monitor) update(n int) (now time.Duration) { + if !m.active { + return + } + if now = clock(); n > 0 { + m.tLast = now + } + m.sBytes += int64(n) + if sTime := now - m.sLast; sTime >= m.sRate { + t := sTime.Seconds() + if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak { + m.rPeak = m.rSample + } + + // Exponential moving average using a method similar to *nix load + // average calculation. Longer sampling periods carry greater weight. + if m.samples > 0 { + w := math.Exp(-t / m.rWindow) + m.rEMA = m.rSample + w*(m.rEMA-m.rSample) + } else { + m.rEMA = m.rSample + } + m.reset(now) + } + return +} + +// reset clears the current sample state in preparation for the next sample. +func (m *Monitor) reset(sampleTime time.Duration) { + m.bytes += m.sBytes + m.samples++ + m.sBytes = 0 + m.sLast = sampleTime +} + +// waitNextSample sleeps for the remainder of the current sample. The lock is +// released and reacquired during the actual sleep period, so it's possible for +// the transfer to be inactive when this method returns. +func (m *Monitor) waitNextSample(now time.Duration) time.Duration { + const minWait = 5 * time.Millisecond + current := m.sLast + + // sleep until the last sample time changes (ideally, just one iteration) + for m.sLast == current && m.active { + d := current + m.sRate - now + m.mu.Unlock() + if d < minWait { + d = minWait + } + time.Sleep(d) + m.mu.Lock() + now = m.update(0) + } + return now +} diff --git a/vendor/github.com/mxk/go-flowrate/flowrate/io.go b/vendor/github.com/mxk/go-flowrate/flowrate/io.go new file mode 100644 index 00000000..fbe09097 --- /dev/null +++ b/vendor/github.com/mxk/go-flowrate/flowrate/io.go @@ -0,0 +1,133 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +package flowrate + +import ( + "errors" + "io" +) + +// ErrLimit is returned by the Writer when a non-blocking write is short due to +// the transfer rate limit. +var ErrLimit = errors.New("flowrate: flow rate limit exceeded") + +// Limiter is implemented by the Reader and Writer to provide a consistent +// interface for monitoring and controlling data transfer. +type Limiter interface { + Done() int64 + Status() Status + SetTransferSize(bytes int64) + SetLimit(new int64) (old int64) + SetBlocking(new bool) (old bool) +} + +// Reader implements io.ReadCloser with a restriction on the rate of data +// transfer. +type Reader struct { + io.Reader // Data source + *Monitor // Flow control monitor + + limit int64 // Rate limit in bytes per second (unlimited when <= 0) + block bool // What to do when no new bytes can be read due to the limit +} + +// NewReader restricts all Read operations on r to limit bytes per second. +func NewReader(r io.Reader, limit int64) *Reader { + return &Reader{r, New(0, 0), limit, true} +} + +// Read reads up to len(p) bytes into p without exceeding the current transfer +// rate limit. It returns (0, nil) immediately if r is non-blocking and no new +// bytes can be read at this time. +func (r *Reader) Read(p []byte) (n int, err error) { + p = p[:r.Limit(len(p), r.limit, r.block)] + if len(p) > 0 { + n, err = r.IO(r.Reader.Read(p)) + } + return +} + +// SetLimit changes the transfer rate limit to new bytes per second and returns +// the previous setting. +func (r *Reader) SetLimit(new int64) (old int64) { + old, r.limit = r.limit, new + return +} + +// SetBlocking changes the blocking behavior and returns the previous setting. A +// Read call on a non-blocking reader returns immediately if no additional bytes +// may be read at this time due to the rate limit. +func (r *Reader) SetBlocking(new bool) (old bool) { + old, r.block = r.block, new + return +} + +// Close closes the underlying reader if it implements the io.Closer interface. +func (r *Reader) Close() error { + defer r.Done() + if c, ok := r.Reader.(io.Closer); ok { + return c.Close() + } + return nil +} + +// Writer implements io.WriteCloser with a restriction on the rate of data +// transfer. +type Writer struct { + io.Writer // Data destination + *Monitor // Flow control monitor + + limit int64 // Rate limit in bytes per second (unlimited when <= 0) + block bool // What to do when no new bytes can be written due to the limit +} + +// NewWriter restricts all Write operations on w to limit bytes per second. The +// transfer rate and the default blocking behavior (true) can be changed +// directly on the returned *Writer. +func NewWriter(w io.Writer, limit int64) *Writer { + return &Writer{w, New(0, 0), limit, true} +} + +// Write writes len(p) bytes from p to the underlying data stream without +// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is +// non-blocking and no additional bytes can be written at this time. +func (w *Writer) Write(p []byte) (n int, err error) { + var c int + for len(p) > 0 && err == nil { + s := p[:w.Limit(len(p), w.limit, w.block)] + if len(s) > 0 { + c, err = w.IO(w.Writer.Write(s)) + } else { + return n, ErrLimit + } + p = p[c:] + n += c + } + return +} + +// SetLimit changes the transfer rate limit to new bytes per second and returns +// the previous setting. +func (w *Writer) SetLimit(new int64) (old int64) { + old, w.limit = w.limit, new + return +} + +// SetBlocking changes the blocking behavior and returns the previous setting. A +// Write call on a non-blocking writer returns as soon as no additional bytes +// may be written at this time due to the rate limit. +func (w *Writer) SetBlocking(new bool) (old bool) { + old, w.block = w.block, new + return +} + +// Close closes the underlying writer if it implements the io.Closer interface. +func (w *Writer) Close() error { + defer w.Done() + if c, ok := w.Writer.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/vendor/github.com/mxk/go-flowrate/flowrate/util.go b/vendor/github.com/mxk/go-flowrate/flowrate/util.go new file mode 100644 index 00000000..4caac583 --- /dev/null +++ b/vendor/github.com/mxk/go-flowrate/flowrate/util.go @@ -0,0 +1,67 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +package flowrate + +import ( + "math" + "strconv" + "time" +) + +// clockRate is the resolution and precision of clock(). +const clockRate = 20 * time.Millisecond + +// czero is the process start time rounded down to the nearest clockRate +// increment. +var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate + +// clock returns a low resolution timestamp relative to the process start time. +func clock() time.Duration { + return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero +} + +// clockToTime converts a clock() timestamp to an absolute time.Time value. +func clockToTime(c time.Duration) time.Time { + return time.Unix(0, int64(czero+c)) +} + +// clockRound returns d rounded to the nearest clockRate increment. +func clockRound(d time.Duration) time.Duration { + return (d + clockRate>>1) / clockRate * clockRate +} + +// round returns x rounded to the nearest int64 (non-negative values only). +func round(x float64) int64 { + if _, frac := math.Modf(x); frac >= 0.5 { + return int64(math.Ceil(x)) + } + return int64(math.Floor(x)) +} + +// Percent represents a percentage in increments of 1/1000th of a percent. +type Percent uint32 + +// percentOf calculates what percent of the total is x. +func percentOf(x, total float64) Percent { + if x < 0 || total <= 0 { + return 0 + } else if p := round(x / total * 1e5); p <= math.MaxUint32 { + return Percent(p) + } + return Percent(math.MaxUint32) +} + +func (p Percent) Float() float64 { + return float64(p) * 1e-3 +} + +func (p Percent) String() string { + var buf [12]byte + b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10) + n := len(b) + b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10) + b[n] = '.' + return string(append(b, '%')) +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/proxy/dial.go b/vendor/k8s.io/apimachinery/pkg/util/proxy/dial.go new file mode 100644 index 00000000..a59b24c8 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/proxy/dial.go @@ -0,0 +1,117 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + + "k8s.io/klog" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/third_party/forked/golang/netutil" +) + +func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(url) + + dialer, err := utilnet.DialerFor(transport) + if err != nil { + klog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err) + } + + switch url.Scheme { + case "http": + if dialer != nil { + return dialer(ctx, "tcp", dialAddr) + } + var d net.Dialer + return d.DialContext(ctx, "tcp", dialAddr) + case "https": + // Get the tls config from the transport if we recognize it + var tlsConfig *tls.Config + var tlsConn *tls.Conn + var err error + tlsConfig, err = utilnet.TLSClientConfig(transport) + if err != nil { + klog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err) + } + + if dialer != nil { + // We have a dialer; use it to open the connection, then + // create a tls client using the connection. + netConn, err := dialer(ctx, "tcp", dialAddr) + if err != nil { + return nil, err + } + if tlsConfig == nil { + // tls.Client requires non-nil config + klog.Warningf("using custom dialer with no TLSClientConfig. Defaulting to InsecureSkipVerify") + // tls.Handshake() requires ServerName or InsecureSkipVerify + tlsConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } else if len(tlsConfig.ServerName) == 0 && !tlsConfig.InsecureSkipVerify { + // tls.Handshake() requires ServerName or InsecureSkipVerify + // infer the ServerName from the hostname we're connecting to. + inferredHost := dialAddr + if host, _, err := net.SplitHostPort(dialAddr); err == nil { + inferredHost = host + } + // Make a copy to avoid polluting the provided config + tlsConfigCopy := tlsConfig.Clone() + tlsConfigCopy.ServerName = inferredHost + tlsConfig = tlsConfigCopy + } + tlsConn = tls.Client(netConn, tlsConfig) + if err := tlsConn.Handshake(); err != nil { + netConn.Close() + return nil, err + } + + } else { + // Dial. This Dial method does not allow to pass a context unfortunately + tlsConn, err = tls.Dial("tcp", dialAddr, tlsConfig) + if err != nil { + return nil, err + } + } + + // Return if we were configured to skip validation + if tlsConfig != nil && tlsConfig.InsecureSkipVerify { + return tlsConn, nil + } + + // Verify + host, _, _ := net.SplitHostPort(dialAddr) + if tlsConfig != nil && len(tlsConfig.ServerName) > 0 { + host = tlsConfig.ServerName + } + if err := tlsConn.VerifyHostname(host); err != nil { + tlsConn.Close() + return nil, err + } + + return tlsConn, nil + default: + return nil, fmt.Errorf("Unknown scheme: %s", url.Scheme) + } +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/proxy/doc.go b/vendor/k8s.io/apimachinery/pkg/util/proxy/doc.go new file mode 100644 index 00000000..d14ecfad --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/proxy/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package proxy provides transport and upgrade support for proxies. +package proxy // import "k8s.io/apimachinery/pkg/util/proxy" diff --git a/vendor/k8s.io/apimachinery/pkg/util/proxy/transport.go b/vendor/k8s.io/apimachinery/pkg/util/proxy/transport.go new file mode 100644 index 00000000..aecafb35 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/proxy/transport.go @@ -0,0 +1,274 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" + + "golang.org/x/net/html" + "golang.org/x/net/html/atom" + "k8s.io/klog" + + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/sets" +) + +// atomsToAttrs states which attributes of which tags require URL substitution. +// Sources: http://www.w3.org/TR/REC-html40/index/attributes.html +// http://www.w3.org/html/wg/drafts/html/master/index.html#attributes-1 +var atomsToAttrs = map[atom.Atom]sets.String{ + atom.A: sets.NewString("href"), + atom.Applet: sets.NewString("codebase"), + atom.Area: sets.NewString("href"), + atom.Audio: sets.NewString("src"), + atom.Base: sets.NewString("href"), + atom.Blockquote: sets.NewString("cite"), + atom.Body: sets.NewString("background"), + atom.Button: sets.NewString("formaction"), + atom.Command: sets.NewString("icon"), + atom.Del: sets.NewString("cite"), + atom.Embed: sets.NewString("src"), + atom.Form: sets.NewString("action"), + atom.Frame: sets.NewString("longdesc", "src"), + atom.Head: sets.NewString("profile"), + atom.Html: sets.NewString("manifest"), + atom.Iframe: sets.NewString("longdesc", "src"), + atom.Img: sets.NewString("longdesc", "src", "usemap"), + atom.Input: sets.NewString("src", "usemap", "formaction"), + atom.Ins: sets.NewString("cite"), + atom.Link: sets.NewString("href"), + atom.Object: sets.NewString("classid", "codebase", "data", "usemap"), + atom.Q: sets.NewString("cite"), + atom.Script: sets.NewString("src"), + atom.Source: sets.NewString("src"), + atom.Video: sets.NewString("poster", "src"), + + // TODO: css URLs hidden in style elements. +} + +// Transport is a transport for text/html content that replaces URLs in html +// content with the prefix of the proxy server +type Transport struct { + Scheme string + Host string + PathPrepend string + + http.RoundTripper +} + +// RoundTrip implements the http.RoundTripper interface +func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { + // Add reverse proxy headers. + forwardedURI := path.Join(t.PathPrepend, req.URL.Path) + if strings.HasSuffix(req.URL.Path, "/") { + forwardedURI = forwardedURI + "/" + } + req.Header.Set("X-Forwarded-Uri", forwardedURI) + if len(t.Host) > 0 { + req.Header.Set("X-Forwarded-Host", t.Host) + } + if len(t.Scheme) > 0 { + req.Header.Set("X-Forwarded-Proto", t.Scheme) + } + + rt := t.RoundTripper + if rt == nil { + rt = http.DefaultTransport + } + resp, err := rt.RoundTrip(req) + + if err != nil { + message := fmt.Sprintf("Error trying to reach service: '%v'", err.Error()) + resp = &http.Response{ + Header: http.Header{}, + StatusCode: http.StatusServiceUnavailable, + Body: ioutil.NopCloser(strings.NewReader(message)), + } + resp.Header.Set("Content-Type", "text/plain; charset=utf-8") + resp.Header.Set("X-Content-Type-Options", "nosniff") + return resp, nil + } + + if redirect := resp.Header.Get("Location"); redirect != "" { + resp.Header.Set("Location", t.rewriteURL(redirect, req.URL, req.Host)) + return resp, nil + } + + cType := resp.Header.Get("Content-Type") + cType = strings.TrimSpace(strings.SplitN(cType, ";", 2)[0]) + if cType != "text/html" { + // Do nothing, simply pass through + return resp, nil + } + + return t.rewriteResponse(req, resp) +} + +var _ = net.RoundTripperWrapper(&Transport{}) + +func (rt *Transport) WrappedRoundTripper() http.RoundTripper { + return rt.RoundTripper +} + +// rewriteURL rewrites a single URL to go through the proxy, if the URL refers +// to the same host as sourceURL, which is the page on which the target URL +// occurred, or if the URL matches the sourceRequestHost. If any error occurs (e.g. +// parsing), it returns targetURL. +func (t *Transport) rewriteURL(targetURL string, sourceURL *url.URL, sourceRequestHost string) string { + url, err := url.Parse(targetURL) + if err != nil { + return targetURL + } + + // Example: + // When API server processes a proxy request to a service (e.g. /api/v1/namespace/foo/service/bar/proxy/), + // the sourceURL.Host (i.e. req.URL.Host) is the endpoint IP address of the service. The + // sourceRequestHost (i.e. req.Host) is the Host header that specifies the host on which the + // URL is sought, which can be different from sourceURL.Host. For example, if user sends the + // request through "kubectl proxy" locally (i.e. localhost:8001/api/v1/namespace/foo/service/bar/proxy/), + // sourceRequestHost is "localhost:8001". + // + // If the service's response URL contains non-empty host, and url.Host is equal to either sourceURL.Host + // or sourceRequestHost, we should not consider the returned URL to be a completely different host. + // It's the API server's responsibility to rewrite a same-host-and-absolute-path URL and append the + // necessary URL prefix (i.e. /api/v1/namespace/foo/service/bar/proxy/). + isDifferentHost := url.Host != "" && url.Host != sourceURL.Host && url.Host != sourceRequestHost + isRelative := !strings.HasPrefix(url.Path, "/") + if isDifferentHost || isRelative { + return targetURL + } + + // Do not rewrite scheme and host if the Transport has empty scheme and host + // when targetURL already contains the sourceRequestHost + if !(url.Host == sourceRequestHost && t.Scheme == "" && t.Host == "") { + url.Scheme = t.Scheme + url.Host = t.Host + } + + origPath := url.Path + // Do not rewrite URL if the sourceURL already contains the necessary prefix. + if strings.HasPrefix(url.Path, t.PathPrepend) { + return url.String() + } + url.Path = path.Join(t.PathPrepend, url.Path) + if strings.HasSuffix(origPath, "/") { + // Add back the trailing slash, which was stripped by path.Join(). + url.Path += "/" + } + + return url.String() +} + +// rewriteHTML scans the HTML for tags with url-valued attributes, and updates +// those values with the urlRewriter function. The updated HTML is output to the +// writer. +func rewriteHTML(reader io.Reader, writer io.Writer, urlRewriter func(string) string) error { + // Note: This assumes the content is UTF-8. + tokenizer := html.NewTokenizer(reader) + + var err error + for err == nil { + tokenType := tokenizer.Next() + switch tokenType { + case html.ErrorToken: + err = tokenizer.Err() + case html.StartTagToken, html.SelfClosingTagToken: + token := tokenizer.Token() + if urlAttrs, ok := atomsToAttrs[token.DataAtom]; ok { + for i, attr := range token.Attr { + if urlAttrs.Has(attr.Key) { + token.Attr[i].Val = urlRewriter(attr.Val) + } + } + } + _, err = writer.Write([]byte(token.String())) + default: + _, err = writer.Write(tokenizer.Raw()) + } + } + if err != io.EOF { + return err + } + return nil +} + +// rewriteResponse modifies an HTML response by updating absolute links referring +// to the original host to instead refer to the proxy transport. +func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*http.Response, error) { + origBody := resp.Body + defer origBody.Close() + + newContent := &bytes.Buffer{} + var reader io.Reader = origBody + var writer io.Writer = newContent + encoding := resp.Header.Get("Content-Encoding") + switch encoding { + case "gzip": + var err error + reader, err = gzip.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("errorf making gzip reader: %v", err) + } + gzw := gzip.NewWriter(writer) + defer gzw.Close() + writer = gzw + case "deflate": + var err error + reader = flate.NewReader(reader) + flw, err := flate.NewWriter(writer, flate.BestCompression) + if err != nil { + return nil, fmt.Errorf("errorf making flate writer: %v", err) + } + defer func() { + flw.Close() + flw.Flush() + }() + writer = flw + case "": + // This is fine + default: + // Some encoding we don't understand-- don't try to parse this + klog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding) + return resp, nil + } + + urlRewriter := func(targetUrl string) string { + return t.rewriteURL(targetUrl, req.URL, req.Host) + } + err := rewriteHTML(reader, writer, urlRewriter) + if err != nil { + klog.Errorf("Failed to rewrite URLs: %v", err) + return resp, err + } + + resp.Body = ioutil.NopCloser(newContent) + // Update header node with new content-length + // TODO: Remove any hash/signature headers here? + resp.Header.Del("Content-Length") + resp.ContentLength = int64(newContent.Len()) + + return resp, err +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go new file mode 100644 index 00000000..fcdc76a0 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -0,0 +1,480 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/http/httputil" + "net/url" + "os" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/httpstream" + utilnet "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/mxk/go-flowrate/flowrate" + "k8s.io/klog" +) + +// UpgradeRequestRoundTripper provides an additional method to decorate a request +// with any authentication or other protocol level information prior to performing +// an upgrade on the server. Any response will be handled by the intercepting +// proxy. +type UpgradeRequestRoundTripper interface { + http.RoundTripper + // WrapRequest takes a valid HTTP request and returns a suitably altered version + // of request with any HTTP level values required to complete the request half of + // an upgrade on the server. It does not get a chance to see the response and + // should bypass any request side logic that expects to see the response. + WrapRequest(*http.Request) (*http.Request, error) +} + +// UpgradeAwareHandler is a handler for proxy requests that may require an upgrade +type UpgradeAwareHandler struct { + // UpgradeRequired will reject non-upgrade connections if true. + UpgradeRequired bool + // Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server + // for upgrade requests unless UseRequestLocationOnUpgrade is true. + Location *url.URL + // Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used + Transport http.RoundTripper + // UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided. + // This allows clients to disable HTTP/2. + UpgradeTransport UpgradeRequestRoundTripper + // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting) + WrapTransport bool + // InterceptRedirects determines whether the proxy should sniff backend responses for redirects, + // following them as necessary. + InterceptRedirects bool + // RequireSameHostRedirects only allows redirects to the same host. It is only used if InterceptRedirects=true. + RequireSameHostRedirects bool + // UseRequestLocation will use the incoming request URL when talking to the backend server. + UseRequestLocation bool + // FlushInterval controls how often the standard HTTP proxy will flush content from the upstream. + FlushInterval time.Duration + // MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero. + MaxBytesPerSec int64 + // Responder is passed errors that occur while setting up proxying. + Responder ErrorResponder +} + +const defaultFlushInterval = 200 * time.Millisecond + +// ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular +// error format. +type ErrorResponder interface { + Error(w http.ResponseWriter, req *http.Request, err error) +} + +// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only +// service a single request/response per proxy. +type SimpleErrorResponder interface { + Error(err error) +} + +func NewErrorResponder(r SimpleErrorResponder) ErrorResponder { + return simpleResponder{r} +} + +type simpleResponder struct { + responder SimpleErrorResponder +} + +func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) { + r.responder.Error(err) +} + +// upgradeRequestRoundTripper implements proxy.UpgradeRequestRoundTripper. +type upgradeRequestRoundTripper struct { + http.RoundTripper + upgrader http.RoundTripper +} + +var ( + _ UpgradeRequestRoundTripper = &upgradeRequestRoundTripper{} + _ utilnet.RoundTripperWrapper = &upgradeRequestRoundTripper{} +) + +// WrappedRoundTripper returns the round tripper that a caller would use. +func (rt *upgradeRequestRoundTripper) WrappedRoundTripper() http.RoundTripper { + return rt.RoundTripper +} + +// WriteToRequest calls the nested upgrader and then copies the returned request +// fields onto the passed request. +func (rt *upgradeRequestRoundTripper) WrapRequest(req *http.Request) (*http.Request, error) { + resp, err := rt.upgrader.RoundTrip(req) + if err != nil { + return nil, err + } + return resp.Request, nil +} + +// onewayRoundTripper captures the provided request - which is assumed to have +// been modified by other round trippers - and then returns a fake response. +type onewayRoundTripper struct{} + +// RoundTrip returns a simple 200 OK response that captures the provided request. +func (onewayRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(&bytes.Buffer{}), + Request: req, + }, nil +} + +// MirrorRequest is a round tripper that can be called to get back the calling request as +// the core round tripper in a chain. +var MirrorRequest http.RoundTripper = onewayRoundTripper{} + +// NewUpgradeRequestRoundTripper takes two round trippers - one for the underlying TCP connection, and +// one that is able to write headers to an HTTP request. The request rt is used to set the request headers +// and that is written to the underlying connection rt. +func NewUpgradeRequestRoundTripper(connection, request http.RoundTripper) UpgradeRequestRoundTripper { + return &upgradeRequestRoundTripper{ + RoundTripper: connection, + upgrader: request, + } +} + +// normalizeLocation returns the result of parsing the full URL, with scheme set to http if missing +func normalizeLocation(location *url.URL) *url.URL { + normalized, _ := url.Parse(location.String()) + if len(normalized.Scheme) == 0 { + normalized.Scheme = "http" + } + return normalized +} + +// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning +// errors to the caller. +func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler { + return &UpgradeAwareHandler{ + Location: normalizeLocation(location), + Transport: transport, + WrapTransport: wrapTransport, + UpgradeRequired: upgradeRequired, + FlushInterval: defaultFlushInterval, + Responder: responder, + } +} + +// ServeHTTP handles the proxy request +func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if h.tryUpgrade(w, req) { + return + } + if h.UpgradeRequired { + h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required")) + return + } + + loc := *h.Location + loc.RawQuery = req.URL.RawQuery + + // If original request URL ended in '/', append a '/' at the end of the + // of the proxy URL + if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") { + loc.Path += "/" + } + + // From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP: + // Redirect requests with an empty path to a location that ends with a '/' + // This is essentially a hack for http://issue.k8s.io/4958. + // Note: Keep this code after tryUpgrade to not break that flow. + if len(loc.Path) == 0 { + var queryPart string + if len(req.URL.RawQuery) > 0 { + queryPart = "?" + req.URL.RawQuery + } + w.Header().Set("Location", req.URL.Path+"/"+queryPart) + w.WriteHeader(http.StatusMovedPermanently) + return + } + + if h.Transport == nil || h.WrapTransport { + h.Transport = h.defaultProxyTransport(req.URL, h.Transport) + } + + // WithContext creates a shallow clone of the request with the same context. + newReq := req.WithContext(req.Context()) + newReq.Header = utilnet.CloneHeader(req.Header) + if !h.UseRequestLocation { + newReq.URL = &loc + } + + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}) + proxy.Transport = h.Transport + proxy.FlushInterval = h.FlushInterval + proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags) + proxy.ServeHTTP(w, newReq) +} + +type noSuppressPanicError struct{} + +func (noSuppressPanicError) Write(p []byte) (n int, err error) { + // skip "suppressing panic for copyResponse error in test; copy error" error message + // that ends up in CI tests on each kube-apiserver termination as noise and + // everybody thinks this is fatal. + if strings.Contains(string(p), "suppressing panic") { + return len(p), nil + } + return os.Stderr.Write(p) +} + +// tryUpgrade returns true if the request was handled. +func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { + if !httpstream.IsUpgradeRequest(req) { + klog.V(6).Infof("Request was not an upgrade") + return false + } + + var ( + backendConn net.Conn + rawResponse []byte + err error + ) + + location := *h.Location + if h.UseRequestLocation { + location = *req.URL + location.Scheme = h.Location.Scheme + location.Host = h.Location.Host + } + + clone := utilnet.CloneRequest(req) + // Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy + // handles this in the non-upgrade path. + utilnet.AppendForwardedForHeader(clone) + if h.InterceptRedirects { + klog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header) + backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade), h.RequireSameHostRedirects) + } else { + klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header) + clone.URL = &location + backendConn, err = h.DialForUpgrade(clone) + } + if err != nil { + klog.V(6).Infof("Proxy connection error: %v", err) + h.Responder.Error(w, req, err) + return true + } + defer backendConn.Close() + + // determine the http response code from the backend by reading from rawResponse+backendConn + backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn)) + if err != nil { + klog.V(6).Infof("Proxy connection error: %v", err) + h.Responder.Error(w, req, err) + return true + } + if len(headerBytes) > len(rawResponse) { + // we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend + rawResponse = headerBytes + } + + // Once the connection is hijacked, the ErrorResponder will no longer work, so + // hijacking should be the last step in the upgrade. + requestHijacker, ok := w.(http.Hijacker) + if !ok { + klog.V(6).Infof("Unable to hijack response writer: %T", w) + h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w)) + return true + } + requestHijackedConn, _, err := requestHijacker.Hijack() + if err != nil { + klog.V(6).Infof("Unable to hijack response: %v", err) + h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err)) + return true + } + defer requestHijackedConn.Close() + + if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols { + // If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection. + klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode) + // set read/write deadlines + deadline := time.Now().Add(10 * time.Second) + backendConn.SetReadDeadline(deadline) + requestHijackedConn.SetWriteDeadline(deadline) + // write the response to the client + err := backendHTTPResponse.Write(requestHijackedConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + klog.Errorf("Error proxying data from backend to client: %v", err) + } + // Indicate we handled the request + return true + } + + // Forward raw response bytes back to client. + if len(rawResponse) > 0 { + klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) + if _, err = requestHijackedConn.Write(rawResponse); err != nil { + utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) + } + } + + // Proxy the connection. This is bidirectional, so we need a goroutine + // to copy in each direction. Once one side of the connection exits, we + // exit the function which performs cleanup and in the process closes + // the other half of the connection in the defer. + writerComplete := make(chan struct{}) + readerComplete := make(chan struct{}) + + go func() { + var writer io.WriteCloser + if h.MaxBytesPerSec > 0 { + writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) + } else { + writer = backendConn + } + _, err := io.Copy(writer, requestHijackedConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + klog.Errorf("Error proxying data from client to backend: %v", err) + } + close(writerComplete) + }() + + go func() { + var reader io.ReadCloser + if h.MaxBytesPerSec > 0 { + reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) + } else { + reader = backendConn + } + _, err := io.Copy(requestHijackedConn, reader) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + klog.Errorf("Error proxying data from backend to client: %v", err) + } + close(readerComplete) + }() + + // Wait for one half the connection to exit. Once it does the defer will + // clean up the other half of the connection. + select { + case <-writerComplete: + case <-readerComplete: + } + klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header) + + return true +} + +func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) { + return dial(req, h.Transport) +} + +func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) { + if h.UpgradeTransport == nil { + return dial(req, h.Transport) + } + updatedReq, err := h.UpgradeTransport.WrapRequest(req) + if err != nil { + return nil, err + } + return dial(updatedReq, h.UpgradeTransport) +} + +// getResponseCode reads a http response from the given reader, returns the response, +// the bytes read from the reader, and any error encountered +func getResponse(r io.Reader) (*http.Response, []byte, error) { + rawResponse := bytes.NewBuffer(make([]byte, 0, 256)) + // Save the bytes read while reading the response headers into the rawResponse buffer + resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil) + if err != nil { + return nil, nil, err + } + // return the http response and the raw bytes consumed from the reader in the process + return resp, rawResponse.Bytes(), nil +} + +// dial dials the backend at req.URL and writes req to it. +func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) { + conn, err := DialURL(req.Context(), req.URL, transport) + if err != nil { + return nil, fmt.Errorf("error dialing backend: %v", err) + } + + if err = req.Write(conn); err != nil { + conn.Close() + return nil, fmt.Errorf("error sending request: %v", err) + } + + return conn, err +} + +var _ utilnet.Dialer = &UpgradeAwareHandler{} + +func (h *UpgradeAwareHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper { + scheme := url.Scheme + host := url.Host + suffix := h.Location.Path + if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") { + suffix += "/" + } + pathPrepend := strings.TrimSuffix(url.Path, suffix) + rewritingTransport := &Transport{ + Scheme: scheme, + Host: host, + PathPrepend: pathPrepend, + RoundTripper: internalTransport, + } + return &corsRemovingTransport{ + RoundTripper: rewritingTransport, + } +} + +// corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers +// from the internal response. +// Implements pkg/util/net.RoundTripperWrapper +type corsRemovingTransport struct { + http.RoundTripper +} + +var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{}) + +func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.RoundTripper.RoundTrip(req) + if err != nil { + return nil, err + } + removeCORSHeaders(resp) + return resp, nil +} + +func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper { + return rt.RoundTripper +} + +// removeCORSHeaders strip CORS headers sent from the backend +// This should be called on all responses before returning +func removeCORSHeaders(resp *http.Response) { + resp.Header.Del("Access-Control-Allow-Credentials") + resp.Header.Del("Access-Control-Allow-Headers") + resp.Header.Del("Access-Control-Allow-Methods") + resp.Header.Del("Access-Control-Allow-Origin") +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 09e8a026..a611599d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -482,6 +482,8 @@ github.com/modern-go/concurrent github.com/modern-go/reflect2 # github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d => github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d github.com/munnerz/goautoneg +# github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f => github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f +github.com/mxk/go-flowrate/flowrate # github.com/naoina/go-stringutil v0.1.0 => github.com/naoina/go-stringutil v0.1.0 github.com/naoina/go-stringutil # github.com/naoina/toml v0.1.1 => github.com/naoina/toml v0.1.1 @@ -999,6 +1001,7 @@ k8s.io/apimachinery/pkg/util/json k8s.io/apimachinery/pkg/util/mergepatch k8s.io/apimachinery/pkg/util/naming k8s.io/apimachinery/pkg/util/net +k8s.io/apimachinery/pkg/util/proxy k8s.io/apimachinery/pkg/util/rand k8s.io/apimachinery/pkg/util/remotecommand k8s.io/apimachinery/pkg/util/runtime -- GitLab