未验证 提交 f8e7d06b 编写于 作者: Z zryfish 提交者: GitHub

move apigateway into apiserver (#1948)

上级 dab71e71
......@@ -8,6 +8,7 @@ import (
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api/iam"
"kubesphere.io/kubesphere/pkg/apiserver"
"kubesphere.io/kubesphere/pkg/informers"
genericoptions "kubesphere.io/kubesphere/pkg/server/options"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
......@@ -89,36 +90,66 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
}
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
apiServer := &apiserver.APIServer{}
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
if err != nil {
return nil, err
}
apiServer.KubernetesClient = kubernetesClient
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(), kubernetesClient.Istio(), kubernetesClient.Application())
apiServer.InformerFactory = informerFactory
monitoringClient := prometheus.NewPrometheus(s.MonitoringOptions)
apiServer.MonitoringClient = monitoringClient
loggingClient, err := esclient.NewElasticsearch(s.LoggingOptions)
if err != nil {
return nil, err
if s.LoggingOptions.Host != "" {
loggingClient, err := esclient.NewElasticsearch(s.LoggingOptions)
if err != nil {
return nil, err
}
apiServer.LoggingClient = loggingClient
}
s3Client, err := s3.NewS3Client(s.S3Options)
if err != nil {
return nil, err
if s.S3Options.Endpoint != "" {
s3Client, err := s3.NewS3Client(s.S3Options)
if err != nil {
return nil, err
}
apiServer.S3Client = s3Client
}
devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return nil, err
if s.DevopsOptions.Host != "" {
devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return nil, err
}
apiServer.DevopsClient = devopsClient
}
ldapClient, err := ldap.NewLdapClient(s.LdapOptions, stopCh)
if err != nil {
return nil, err
if s.LdapOptions.Host != "" {
ldapClient, err := ldap.NewLdapClient(s.LdapOptions, stopCh)
if err != nil {
return nil, err
}
apiServer.LdapClient = ldapClient
}
cacheClient, err := cache.NewRedisClient(s.CacheOptions, stopCh)
if err != nil {
return nil, err
if s.CacheOptions.RedisURL != "" {
cacheClient, err := cache.NewRedisClient(s.CacheOptions, stopCh)
if err != nil {
return nil, err
}
apiServer.CacheClient = cacheClient
}
if s.MySQLOptions.Host != "" {
dbClient, err := mysql.NewMySQLClient(s.MySQLOptions, stopCh)
if err != nil {
return nil, err
}
apiServer.DBClient = dbClient
}
server := &http.Server{
......@@ -133,16 +164,7 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
server.TLSConfig.Certificates = []tls.Certificate{certificate}
}
apiServer := &apiserver.APIServer{
Server: server,
KubernetesClient: kubernetesClient,
MonitoringClient: monitoringClient,
LoggingClient: loggingClient,
S3Client: s3Client,
DevopsClient: devopsClient,
LdapClient: ldapClient,
CacheClient: cacheClient,
}
apiServer.Server = server
return apiServer, nil
}
......@@ -25,7 +25,6 @@ import (
cliflag "k8s.io/component-base/cli/flag"
"kubesphere.io/kubesphere/cmd/ks-apiserver/app/options"
apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing"
"kubesphere.io/kubesphere/pkg/utils/signals"
"kubesphere.io/kubesphere/pkg/utils/term"
)
......@@ -100,7 +99,7 @@ func initializeServicemeshConfig(s *options.ServerRunOptions) {
// Initialize kiali config
config := kconfig.NewConfig()
tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost
//tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost
// Exclude system namespaces
config.API.Namespaces.Exclude = []string{"istio-system", "kubesphere*", "kube*"}
......
......@@ -7,9 +7,18 @@ import (
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/runtime/schema"
urlruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/request/bearertoken"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api/iam"
"kubesphere.io/kubesphere/pkg/apiserver/authentication"
"kubesphere.io/kubesphere/pkg/apiserver/dispatch"
"kubesphere.io/kubesphere/pkg/apiserver/filters"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/informers"
devopsv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/devops/v1alpha2"
iamv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/iam/v1alpha2"
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/logging/v1alpha2"
monitoringv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha2"
......@@ -18,14 +27,15 @@ import (
resourcesv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha2"
resourcev1alpha3 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha3"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/servicemesh/metrics/v1alpha2"
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/tenant/v1alpha2"
terminalv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/terminal/v1alpha2"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/db"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/mysql"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"net"
......@@ -85,7 +95,7 @@ type APIServer struct {
S3Client s3.Interface
//
DBClient db.Interface
DBClient *mysql.Client
//
LdapClient ldap.Interface
......@@ -104,26 +114,31 @@ func (s *APIServer) PrepareRun() error {
s.installKubeSphereAPIs()
for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}
s.Server.Handler = s.container
s.buildHandlerChain()
return nil
}
func (s *APIServer) installKubeSphereAPIs() {
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory))
urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.container))
// Need to refactor devops api registration, too much dependencies
//urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.DevopsClient))
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.DevopsClient, s.DBClient.Database(), nil, s.KubernetesClient.KubeSphere(), s.InformerFactory.KubeSphereSharedInformerFactory(), s.S3Client))
urlruntime.Must(loggingv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.LoggingClient))
urlruntime.Must(monitoringv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.MonitoringClient))
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient))
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory))
//urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.DBClient))
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.DBClient.Database()))
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config()))
urlruntime.Must(iamv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.LdapClient, s.CacheClient, s.AuthenticateOptions))
urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.container))
}
func (s *APIServer) Run(stopCh <-chan struct{}) error {
......@@ -141,6 +156,7 @@ func (s *APIServer) Run(stopCh <-chan struct{}) error {
_ = s.Server.Shutdown(ctx)
}()
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
return s.Server.ListenAndServeTLS("", "")
} else {
......@@ -148,6 +164,27 @@ func (s *APIServer) Run(stopCh <-chan struct{}) error {
}
}
func (s *APIServer) buildHandlerChain() {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
}
failed := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
})
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
handler = filters.WithMultipleClusterDispatcher(handler, dispatch.DefaultClusterDispatch)
handler = filters.WithAuthorization(handler, authorizerfactory.NewAlwaysAllowAuthorizer())
handler = filters.WithAuthentication(handler, bearertoken.New(authentication.NewTokenAuthenticator(s.CacheClient)), failed)
handler = filters.WithRequestInfo(handler, requestInfoResolver)
s.Server.Handler = handler
}
func (s *APIServer) waitForResourceSync(stopCh <-chan struct{}) error {
klog.V(0).Info("Start cache objects")
......@@ -338,3 +375,10 @@ func getRequestIP(req *restful.Request) string {
return address
}
type errorResponder struct{}
func (e *errorResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Error(err)
responsewriters.InternalError(w, req, err)
}
package authentication
import (
"context"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
)
type TokenAuthenticator struct {
cacheClient cache.Interface
}
func NewTokenAuthenticator(cacheClient cache.Interface) authenticator.Token {
return &TokenAuthenticator{
cacheClient: cacheClient,
}
}
func (t *TokenAuthenticator) AuthenticateToken(ctx context.Context, token string) (*authenticator.Response, bool, error) {
return &authenticator.Response{
User: &user.DefaultInfo{
Name: "admin",
UID: "",
Groups: nil,
Extra: nil,
},
}, true, nil
}
package dispatch
import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"net/http"
"k8s.io/apimachinery/pkg/util/proxy"
)
// Dispatcher defines how to forward request to desired cluster apiserver
type Dispatcher interface {
Dispatch(w http.ResponseWriter, req *http.Request)
}
var DefaultClusterDispatch = newClusterDispatch()
type clusterDispatch struct {
transport *http.Transport
}
func newClusterDispatch() Dispatcher {
return &clusterDispatch{}
}
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request) {
u := *req.URL
// u.Host = someHost
httpProxy := proxy.NewUpgradeAwareHandler(&u, c.transport, false, false, c)
httpProxy.ServeHTTP(w, req)
}
func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) {
responsewriters.InternalError(w, req, err)
}
package filters
import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog"
"net/http"
)
func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler) http.Handler {
if auth == nil {
klog.Warningf("Authentication is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
//authenticationStart := time.Now()
resp, ok, err := auth.AuthenticateRequest(req)
if err != nil || !ok {
if err != nil {
klog.Errorf("Unable to authenticate the request due to error: %v", err)
}
failed.ServeHTTP(w, req)
return
}
// authorization header is not required anymore in case of a successful authentication.
req.Header.Del("Authorization")
req = req.WithContext(request.WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}
package filters
import (
"context"
"errors"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"net/http"
)
// WithAuthorization passes all authorized requests on to handler, and returns forbidden error otherwise.
func WithAuthorization(handler http.Handler, a authorizer.Authorizer) http.Handler {
if a == nil {
klog.Warningf("Authorization is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
attributes, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
}
authorized, reason, err := a.Authorize(attributes)
if authorized == authorizer.DecisionAllow {
handler.ServeHTTP(w, req)
return
}
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
w.WriteHeader(http.StatusForbidden)
})
}
func GetAuthorizerAttributes(ctx context.Context) (authorizer.Attributes, error) {
attribs := authorizer.AttributesRecord{}
user, ok := k8srequest.UserFrom(ctx)
if ok {
attribs.User = user
}
requestInfo, found := request.RequestInfoFrom(ctx)
if !found {
return nil, errors.New("no RequestInfo found in the context")
}
// Start with common attributes that apply to resource and non-resource requests
attribs.ResourceRequest = requestInfo.IsResourceRequest
attribs.Path = requestInfo.Path
attribs.Verb = requestInfo.Verb
attribs.APIGroup = requestInfo.APIGroup
attribs.APIVersion = requestInfo.APIVersion
attribs.Resource = requestInfo.Resource
attribs.Subresource = requestInfo.Subresource
attribs.Namespace = requestInfo.Namespace
attribs.Name = requestInfo.Name
return &attribs, nil
}
package filters
import (
"fmt"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/dispatch"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"net/http"
)
// Multiple cluster dispatcher forward request to desired cluster based on request cluster name
// which included in request path clusters/{cluster}
func WithMultipleClusterDispatcher(handler http.Handler, dispatch dispatch.Dispatcher) http.Handler {
if dispatch == nil {
klog.V(4).Infof("Multiple cluster dispatcher is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf(""))
return
}
if info.Cluster == "" {
handler.ServeHTTP(w, req)
} else {
dispatch.Dispatch(w, req)
}
})
}
package filters
import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/rest"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/server/errors"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/util/proxy"
)
// WithKubeAPIServer proxy request to kubernetes service if requests path starts with /api
func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler {
kubernetes, _ := url.Parse(config.Host)
defaultTransport, err := rest.TransportFor(config)
if err != nil {
klog.Errorf("Unable to create transport from rest.Config: %v", err)
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
err := errors.New("Unable to retrieve request info from request")
klog.Error(err)
responsewriters.InternalError(w, req, err)
}
if info.IsKubernetesRequest {
s := *req.URL
s.Host = kubernetes.Host
s.Scheme = kubernetes.Scheme
httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed)
httpProxy.ServeHTTP(w, req)
return
}
handler.ServeHTTP(w, req)
})
}
package filters
import (
"fmt"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"net/http"
)
func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
info, err := resolver.NewRequestInfo(req)
if err != nil {
responsewriters.InternalError(w, req, fmt.Errorf("failed to crate RequestInfo: %v", err))
return
}
req = req.WithContext(request.WithRequestInfo(ctx, info))
handler.ServeHTTP(w, req)
})
}
package request
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"net/http"
"strings"
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
)
type RequestInfoResolver interface {
NewRequestInfo(req *http.Request) (*RequestInfo, error)
}
// specialVerbs contains just strings which are used in REST paths for special actions that don't fall under the normal
// CRUDdy GET/POST/PUT/DELETE actions on REST objects.
// master's Mux.
var specialVerbs = sets.NewString("proxy", "watch")
var kubernetesAPIPrefixes = sets.NewString("api", "apis")
// RequestInfo holds information parsed from the http.Request,
// extended from k8s.io/apiserver/pkg/endpoints/request/requestinfo.go
type RequestInfo struct {
*k8srequest.RequestInfo
// IsKubeSphereRequest indicates whether or not the request should be handled by kubernetes or kubesphere
IsKubernetesRequest bool
// Workspace of requested namespace, for non-workspaced resources, this may be empty
Workspace string
// Cluster of requested resource, this is empty in single-cluster environment
Cluster string
}
type RequestInfoFactory struct {
APIPrefixes sets.String
GrouplessAPIPrefixes sets.String
k8sRequestInfoFactory *k8srequest.RequestInfoFactory
}
// NewRequestInfo returns the information from the http request. If error is not nil, RequestInfo holds the information as best it is known before the failure
// It handles both resource and non-resource requests and fills in all the pertinent information for each.
// Valid Inputs:
//
// /apis/{api-group}/{version}/namespaces
// /api/{version}/namespaces
// /api/{version}/namespaces/{namespace}
// /api/{version}/namespaces/{namespace}/{resource}
// /api/{version}/namespaces/{namespace}/{resource}/{resourceName}
// /api/{version}/{resource}
// /api/{version}/{resource}/{resourceName}
//
// Special verbs without subresources:
// /api/{version}/proxy/{resource}/{resourceName}
// /api/{version}/proxy/namespaces/{namespace}/{resource}/{resourceName}
//
// Special verbs with subresources:
// /api/{version}/watch/{resource}
// /api/{version}/watch/namespaces/{namespace}/{resource}
//
// /kapis/{api-group}/{version}/workspaces/{workspace}/{resource}/{resourceName}
// /
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}
// With workspaces:
// /kapis/{api-group}/{version}/clusters/{cluster}/namespaces/{namespace}/{resource}
// /kapis/{api-group}/{version}/clusters/{cluster}/namespaces/{namespace}/{resource}/{resourceName}
//
func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) {
requestInfo := RequestInfo{
IsKubernetesRequest: false,
RequestInfo: &k8srequest.RequestInfo{
Path: req.URL.Path,
Verb: req.Method,
},
}
defer func() {
if kubernetesAPIPrefixes.Has(requestInfo.APIPrefix) {
requestInfo.IsKubernetesRequest = true
}
}()
currentParts := splitPath(req.URL.Path)
if len(currentParts) < 3 {
return &requestInfo, nil
}
if !r.APIPrefixes.Has(currentParts[0]) {
// return a non-resource request
return &requestInfo, nil
}
requestInfo.APIPrefix = currentParts[0]
currentParts = currentParts[1:]
if !r.GrouplessAPIPrefixes.Has(requestInfo.APIPrefix) {
if len(currentParts) < 2 {
return &requestInfo, nil
}
if currentParts[0] == "clusters" {
requestInfo.Cluster = currentParts[1]
currentParts = currentParts[2:]
}
if len(currentParts) < 3 {
return &requestInfo, nil
}
requestInfo.APIGroup = currentParts[0]
currentParts = currentParts[1:]
}
requestInfo.IsResourceRequest = true
requestInfo.APIVersion = currentParts[0]
currentParts = currentParts[1:]
if specialVerbs.Has(currentParts[0]) {
if len(currentParts) < 2 {
return &requestInfo, fmt.Errorf("unable to determine kind and namespace from url: %v", req.URL)
}
requestInfo.Verb = currentParts[0]
currentParts = currentParts[1:]
} else {
switch req.Method {
case "POST":
requestInfo.Verb = "create"
case "GET", "HEAD":
requestInfo.Verb = "get"
case "PUT":
requestInfo.Verb = "update"
case "PATCH":
requestInfo.Verb = "patch"
case "DELETE":
requestInfo.Verb = "delete"
default:
requestInfo.Verb = ""
}
}
return &requestInfo, nil
}
type requestInfoKeyType int
// requestInfoKey is the RequestInfo key for the context. It's of private type here. Because
// keys are interfaces and interfaces are equal when the type and the value is equal, this
// does not conflict with the keys defined in pkg/api.
const requestInfoKey requestInfoKeyType = iota
func WithRequestInfo(parent context.Context, info *RequestInfo) context.Context {
return k8srequest.WithValue(parent, requestInfoKey, info)
}
func RequestInfoFrom(ctx context.Context) (*RequestInfo, bool) {
info, ok := ctx.Value(requestInfoKey).(*RequestInfo)
return info, ok
}
// splitPath returns the segments for a URL path.
func splitPath(path string) []string {
path = strings.Trim(path, "/")
if path == "" {
return []string{}
}
return strings.Split(path, "/")
}
package metrics
import (
"fmt"
"github.com/emicklei/go-restful"
"github.com/kiali/kiali/handlers"
)
// Get app metrics
func GetAppMetrics(request *restful.Request, response *restful.Response) {
handlers.AppMetrics(request, response)
}
// Get workload metrics
func GetWorkloadMetrics(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
workload := request.PathParameter("workload")
if len(namespace) > 0 && len(workload) > 0 {
request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s&workload=%s", request.Request.URL.RawQuery, namespace, workload)
}
handlers.WorkloadMetrics(request, response)
}
// Get service metrics
func GetServiceMetrics(request *restful.Request, response *restful.Response) {
handlers.ServiceMetrics(request, response)
}
// Get namespace metrics
func GetNamespaceMetrics(request *restful.Request, response *restful.Response) {
handlers.NamespaceMetrics(request, response)
}
// Get service graph for namespace
func GetNamespaceGraph(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
if len(namespace) > 0 {
request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s", request.Request.URL.RawQuery, namespace)
}
handlers.GetNamespaceGraph(request, response)
}
// Get service graph for namespaces
func GetNamespacesGraph(request *restful.Request, response *restful.Response) {
handlers.GraphNamespaces(request, response)
}
// Get namespace health
func GetNamespaceHealth(request *restful.Request, response *restful.Response) {
handlers.NamespaceHealth(request, response)
}
// Get workload health
func GetWorkloadHealth(request *restful.Request, response *restful.Response) {
handlers.WorkloadHealth(request, response)
}
// Get app health
func GetAppHealth(request *restful.Request, response *restful.Response) {
handlers.AppHealth(request, response)
}
// Get service health
func GetServiceHealth(request *restful.Request, response *restful.Response) {
handlers.ServiceHealth(request, response)
}
package tracing
import (
"fmt"
"github.com/emicklei/go-restful"
"io/ioutil"
"log"
"net/http"
)
var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686/jaeger"
func GetServiceTracing(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
service := request.PathParameter("service")
serviceName := fmt.Sprintf("%s.%s", service, namespace)
url := fmt.Sprintf("%s/api/traces?%s&service=%s", JaegerQueryUrl, request.Request.URL.RawQuery, serviceName)
resp, err := http.Get(url)
if err != nil {
log.Printf("query jaeger faile with err %v", err)
response.WriteError(http.StatusInternalServerError, err)
return
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
log.Printf("read response error : %v", err)
response.WriteError(http.StatusInternalServerError, err)
return
}
// need to set header for proper response
response.Header().Set("Content-Type", "application/json")
_, err = response.Write(body)
if err != nil {
log.Printf("write response failed %v", err)
}
}
......@@ -38,7 +38,7 @@ const (
ok = "OK"
)
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"}
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, informerFactory informers.InformerFactory) error {
......
package v1alpha2
import (
"fmt"
"github.com/emicklei/go-restful"
"github.com/kiali/kiali/handlers"
"io/ioutil"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
"net/http"
)
var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686/jaeger"
// Get app metrics
func getAppMetrics(request *restful.Request, response *restful.Response) {
handlers.AppMetrics(request, response)
}
// Get workload metrics
func getWorkloadMetrics(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
workload := request.PathParameter("workload")
if len(namespace) > 0 && len(workload) > 0 {
request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s&workload=%s", request.Request.URL.RawQuery, namespace, workload)
}
handlers.WorkloadMetrics(request, response)
}
// Get service metrics
func getServiceMetrics(request *restful.Request, response *restful.Response) {
handlers.ServiceMetrics(request, response)
}
// Get namespace metrics
func getNamespaceMetrics(request *restful.Request, response *restful.Response) {
handlers.NamespaceMetrics(request, response)
}
// Get service graph for namespace
func getNamespaceGraph(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
if len(namespace) > 0 {
request.Request.URL.RawQuery = fmt.Sprintf("%s&namespaces=%s", request.Request.URL.RawQuery, namespace)
}
handlers.GetNamespaceGraph(request, response)
}
// Get service graph for namespaces
func getNamespacesGraph(request *restful.Request, response *restful.Response) {
handlers.GraphNamespaces(request, response)
}
// Get namespace health
func getNamespaceHealth(request *restful.Request, response *restful.Response) {
handlers.NamespaceHealth(request, response)
}
// Get workload health
func getWorkloadHealth(request *restful.Request, response *restful.Response) {
handlers.WorkloadHealth(request, response)
}
// Get app health
func getAppHealth(request *restful.Request, response *restful.Response) {
handlers.AppHealth(request, response)
}
// Get service health
func getServiceHealth(request *restful.Request, response *restful.Response) {
handlers.ServiceHealth(request, response)
}
func getServiceTracing(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
service := request.PathParameter("service")
serviceName := fmt.Sprintf("%s.%s", service, namespace)
url := fmt.Sprintf("%s/api/traces?%s&service=%s", JaegerQueryUrl, request.Request.URL.RawQuery, serviceName)
resp, err := http.Get(url)
if err != nil {
klog.Errorf("query jaeger faile with err %v", err)
api.HandleInternalError(response, err)
return
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
klog.Errorf("read response error : %v", err)
api.HandleInternalError(response, err)
return
}
// need to set header for proper response
response.Header().Set("Content-Type", "application/json")
_, err = response.Write(body)
if err != nil {
klog.Errorf("write response failed %v", err)
}
}
......@@ -5,8 +5,6 @@ import (
"github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
"kubesphere.io/kubesphere/pkg/apiserver/servicemesh/metrics"
"kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing"
"net/http"
)
......@@ -14,12 +12,7 @@ const GroupName = "servicemesh.kubesphere.io"
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"}
var (
WebServiceBuilder = runtime.NewContainerBuilder(addWebService)
AddToContainer = WebServiceBuilder.AddToContainer
)
func addWebService(c *restful.Container) error {
func AddToContainer(c *restful.Container) error {
tags := []string{"ServiceMesh"}
......@@ -28,7 +21,7 @@ func addWebService(c *restful.Container) error {
// Get service metrics
// GET /namespaces/{namespace}/services/{service}/metrics
webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/metrics").
To(metrics.GetServiceMetrics).
To(getServiceMetrics).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get service metrics from a specific namespace").
Param(webservice.PathParameter("namespace", "name of the namespace")).
......@@ -49,7 +42,7 @@ func addWebService(c *restful.Container) error {
// Get app metrics
// Get /namespaces/{namespace}/apps/{app}/metrics
webservice.Route(webservice.GET("/namespaces/{namespace}/apps/{app}/metrics").
To(metrics.GetAppMetrics).
To(getAppMetrics).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get app metrics from a specific namespace").
Param(webservice.PathParameter("namespace", "name of the namespace")).
......@@ -71,7 +64,7 @@ func addWebService(c *restful.Container) error {
// Get workload metrics
// Get /namespaces/{namespace}/workloads/{workload}/metrics
webservice.Route(webservice.GET("/namespaces/{namespace}/workloads/{workload}/metrics").
To(metrics.GetWorkloadMetrics).
To(getWorkloadMetrics).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get workload metrics from a specific namespace").
Param(webservice.PathParameter("namespace", "name of the namespace").Required(true)).
......@@ -94,7 +87,7 @@ func addWebService(c *restful.Container) error {
// Get namespace metrics
// Get /namespaces/{namespace}/metrics
webservice.Route(webservice.GET("/namespaces/{namespace}/metrics").
To(metrics.GetNamespaceMetrics).
To(getNamespaceMetrics).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get metrics from a specific namespace").
Param(webservice.PathParameter("namespace", "name of the namespace").Required(true)).
......@@ -115,7 +108,7 @@ func addWebService(c *restful.Container) error {
// Get namespace graph
// Get /namespaces/{namespace}/graph
webservice.Route(webservice.GET("/namespaces/{namespace}/graph").
To(metrics.GetNamespaceGraph).
To(getNamespaceGraph).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get service graph for a specific namespace").
Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)).
......@@ -132,7 +125,7 @@ func addWebService(c *restful.Container) error {
// Get namespaces graph, for multiple namespaces
// Get /namespaces/graph
webservice.Route(webservice.GET("/namespaces/graph").
To(metrics.GetNamespacesGraph).
To(getNamespacesGraph).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get graph from all namespaces").
Param(webservice.QueryParameter("duration", "duration of the query period, in seconds").DefaultValue("10m")).
......@@ -147,7 +140,7 @@ func addWebService(c *restful.Container) error {
// Get namespace health
webservice.Route(webservice.GET("/namespaces/{namespace}/health").
To(metrics.GetNamespaceHealth).
To(getNamespaceHealth).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get app/service/workload health of a namespace").
Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)).
......@@ -161,7 +154,7 @@ func addWebService(c *restful.Container) error {
// Get workloads health
webservice.Route(webservice.GET("/namespaces/{namespace}/workloads/{workload}/health").
To(metrics.GetWorkloadHealth).
To(getWorkloadHealth).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get workload health").
Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)).
......@@ -173,7 +166,7 @@ func addWebService(c *restful.Container) error {
// Get app health
webservice.Route(webservice.GET("/namespaces/{namespace}/apps/{app}/health").
To(metrics.GetAppHealth).
To(getAppHealth).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get app health").
Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)).
......@@ -185,7 +178,7 @@ func addWebService(c *restful.Container) error {
// Get service health
webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/health").
To(metrics.GetServiceHealth).
To(getServiceHealth).
Metadata(restfulspec.KeyOpenAPITags, tags).
Doc("Get service health").
Param(webservice.PathParameter("namespace", "name of a namespace").Required(true)).
......@@ -197,7 +190,7 @@ func addWebService(c *restful.Container) error {
// Get service tracing
webservice.Route(webservice.GET("/namespaces/{namespace}/services/{service}/traces").
To(tracing.GetServiceTracing).
To(getServiceTracing).
Doc("Get tracing of a service, should have servicemesh enabled first").
Metadata(restfulspec.KeyOpenAPITags, tags).
Param(webservice.PathParameter("namespace", "namespace of service").Required(true)).
......
......@@ -53,12 +53,16 @@ type ResourceGetter struct {
}
func (r ResourceGetter) Add(resource string, getter v1alpha2.Interface) {
if r.resourcesGetters == nil {
r.resourcesGetters = make(map[string]v1alpha2.Interface)
}
r.resourcesGetters[resource] = getter
}
func NewResourceGetter(factory informers.InformerFactory) *ResourceGetter {
resourceGetters := make(map[string]v1alpha2.Interface)
//resourceGetters[v1alpha2.Deployments] = deployments
resourceGetters[v1alpha2.ConfigMaps] = configmap.NewConfigmapSearcher(factory.KubernetesSharedInformerFactory())
resourceGetters[v1alpha2.CronJobs] = cronjob.NewCronJobSearcher(factory.KubernetesSharedInformerFactory())
resourceGetters[v1alpha2.DaemonSets] = daemonset.NewDaemonSetSearcher(factory.KubernetesSharedInformerFactory())
......@@ -135,7 +139,7 @@ func (r *ResourceGetter) ListResources(namespace, resource string, conditions *p
limit = len(result) - offset
}
result = result[offset : offset+limit]
items = result[offset : offset+limit]
return &models.PageableResponse{TotalCount: len(result), Items: items}, nil
}
......@@ -66,7 +66,7 @@ func NewRouterOperator(client kubernetes.Interface, informers informers.SharedIn
routerTemplates := make(map[string]runtime.Object, 2)
if err != nil {
klog.Fatalf("error happened during loading external yamls, %v", err)
klog.Errorf("error happened during loading external yamls, %v", err)
}
for _, f := range yamls {
......
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filter
import (
"k8s.io/klog"
"net"
"strings"
"time"
"github.com/emicklei/go-restful"
)
func Logging(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
start := time.Now()
chain.ProcessFilter(req, resp)
klog.V(4).Infof("%s - \"%s %s %s\" %d %d %dms",
getRequestIP(req),
req.Request.Method,
req.Request.RequestURI,
req.Request.Proto,
resp.StatusCode(),
resp.ContentLength(),
time.Since(start)/time.Millisecond,
)
}
func getRequestIP(req *restful.Request) string {
address := strings.Trim(req.Request.Header.Get("X-Real-Ip"), " ")
if address != "" {
return address
}
address = strings.Trim(req.Request.Header.Get("X-Forwarded-For"), " ")
if address != "" {
return address
}
address, _, err := net.SplitHostPort(req.Request.RemoteAddr)
if err != nil {
return req.Request.RemoteAddr
}
return address
}
package server
import (
"bytes"
"fmt"
"k8s.io/klog"
"net/http"
"runtime"
)
func LogStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason))
for i := 2; ; i += 1 {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
buffer.WriteString(fmt.Sprintf(" %s:%d\r\n", file, line))
}
klog.Error(buffer.String())
httpWriter.WriteHeader(http.StatusInternalServerError)
httpWriter.Write([]byte("recover from panic situation"))
}
Copyright (c) 2014 The Go-FlowRate Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
* Neither the name of the go-flowrate project nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Written by Maxim Khitrov (November 2012)
//
// Package flowrate provides the tools for monitoring and limiting the flow rate
// of an arbitrary data stream.
package flowrate
import (
"math"
"sync"
"time"
)
// Monitor monitors and limits the transfer rate of a data stream.
type Monitor struct {
mu sync.Mutex // Mutex guarding access to all internal fields
active bool // Flag indicating an active transfer
start time.Duration // Transfer start time (clock() value)
bytes int64 // Total number of bytes transferred
samples int64 // Total number of samples taken
rSample float64 // Most recent transfer rate sample (bytes per second)
rEMA float64 // Exponential moving average of rSample
rPeak float64 // Peak transfer rate (max of all rSamples)
rWindow float64 // rEMA window (seconds)
sBytes int64 // Number of bytes transferred since sLast
sLast time.Duration // Most recent sample time (stop time when inactive)
sRate time.Duration // Sampling rate
tBytes int64 // Number of bytes expected in the current transfer
tLast time.Duration // Time of the most recent transfer of at least 1 byte
}
// New creates a new flow control monitor. Instantaneous transfer rate is
// measured and updated for each sampleRate interval. windowSize determines the
// weight of each sample in the exponential moving average (EMA) calculation.
// The exact formulas are:
//
// sampleTime = currentTime - prevSampleTime
// sampleRate = byteCount / sampleTime
// weight = 1 - exp(-sampleTime/windowSize)
// newRate = weight*sampleRate + (1-weight)*oldRate
//
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
// respectively.
func New(sampleRate, windowSize time.Duration) *Monitor {
if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
sampleRate = 5 * clockRate
}
if windowSize <= 0 {
windowSize = 1 * time.Second
}
now := clock()
return &Monitor{
active: true,
start: now,
rWindow: windowSize.Seconds(),
sLast: now,
sRate: sampleRate,
tLast: now,
}
}
// Update records the transfer of n bytes and returns n. It should be called
// after each Read/Write operation, even if n is 0.
func (m *Monitor) Update(n int) int {
m.mu.Lock()
m.update(n)
m.mu.Unlock()
return n
}
// IO is a convenience method intended to wrap io.Reader and io.Writer method
// execution. It calls m.Update(n) and then returns (n, err) unmodified.
func (m *Monitor) IO(n int, err error) (int, error) {
return m.Update(n), err
}
// Done marks the transfer as finished and prevents any further updates or
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
// Limit methods become NOOPs. It returns the total number of bytes transferred.
func (m *Monitor) Done() int64 {
m.mu.Lock()
if now := m.update(0); m.sBytes > 0 {
m.reset(now)
}
m.active = false
m.tLast = 0
n := m.bytes
m.mu.Unlock()
return n
}
// timeRemLimit is the maximum Status.TimeRem value.
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
// Status represents the current Monitor status. All transfer rates are in bytes
// per second rounded to the nearest byte.
type Status struct {
Active bool // Flag indicating an active transfer
Start time.Time // Transfer start time
Duration time.Duration // Time period covered by the statistics
Idle time.Duration // Time since the last transfer of at least 1 byte
Bytes int64 // Total number of bytes transferred
Samples int64 // Total number of samples taken
InstRate int64 // Instantaneous transfer rate
CurRate int64 // Current transfer rate (EMA of InstRate)
AvgRate int64 // Average transfer rate (Bytes / Duration)
PeakRate int64 // Maximum instantaneous transfer rate
BytesRem int64 // Number of bytes remaining in the transfer
TimeRem time.Duration // Estimated time to completion
Progress Percent // Overall transfer progress
}
// Status returns current transfer status information. The returned value
// becomes static after a call to Done.
func (m *Monitor) Status() Status {
m.mu.Lock()
now := m.update(0)
s := Status{
Active: m.active,
Start: clockToTime(m.start),
Duration: m.sLast - m.start,
Idle: now - m.tLast,
Bytes: m.bytes,
Samples: m.samples,
PeakRate: round(m.rPeak),
BytesRem: m.tBytes - m.bytes,
Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
}
if s.BytesRem < 0 {
s.BytesRem = 0
}
if s.Duration > 0 {
rAvg := float64(s.Bytes) / s.Duration.Seconds()
s.AvgRate = round(rAvg)
if s.Active {
s.InstRate = round(m.rSample)
s.CurRate = round(m.rEMA)
if s.BytesRem > 0 {
if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
ns := float64(s.BytesRem) / tRate * 1e9
if ns > float64(timeRemLimit) {
ns = float64(timeRemLimit)
}
s.TimeRem = clockRound(time.Duration(ns))
}
}
}
}
m.mu.Unlock()
return s
}
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
// second. It returns the maximum number of bytes (0 <= n <= want) that may be
// transferred immediately without exceeding the limit. If block == true, the
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
// or the transfer is inactive (after a call to Done).
//
// At least one byte is always allowed to be transferred in any given sampling
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
// is 10 bytes per second.
//
// For usage examples, see the implementation of Reader and Writer in io.go.
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
if want < 1 || rate < 1 {
return want
}
m.mu.Lock()
// Determine the maximum number of bytes that can be sent in one sample
limit := round(float64(rate) * m.sRate.Seconds())
if limit <= 0 {
limit = 1
}
// If block == true, wait until m.sBytes < limit
if now := m.update(0); block {
for m.sBytes >= limit && m.active {
now = m.waitNextSample(now)
}
}
// Make limit <= want (unlimited if the transfer is no longer active)
if limit -= m.sBytes; limit > int64(want) || !m.active {
limit = int64(want)
}
m.mu.Unlock()
if limit < 0 {
limit = 0
}
return int(limit)
}
// SetTransferSize specifies the total size of the data transfer, which allows
// the Monitor to calculate the overall progress and time to completion.
func (m *Monitor) SetTransferSize(bytes int64) {
if bytes < 0 {
bytes = 0
}
m.mu.Lock()
m.tBytes = bytes
m.mu.Unlock()
}
// update accumulates the transferred byte count for the current sample until
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
// sample is done.
func (m *Monitor) update(n int) (now time.Duration) {
if !m.active {
return
}
if now = clock(); n > 0 {
m.tLast = now
}
m.sBytes += int64(n)
if sTime := now - m.sLast; sTime >= m.sRate {
t := sTime.Seconds()
if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
m.rPeak = m.rSample
}
// Exponential moving average using a method similar to *nix load
// average calculation. Longer sampling periods carry greater weight.
if m.samples > 0 {
w := math.Exp(-t / m.rWindow)
m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
} else {
m.rEMA = m.rSample
}
m.reset(now)
}
return
}
// reset clears the current sample state in preparation for the next sample.
func (m *Monitor) reset(sampleTime time.Duration) {
m.bytes += m.sBytes
m.samples++
m.sBytes = 0
m.sLast = sampleTime
}
// waitNextSample sleeps for the remainder of the current sample. The lock is
// released and reacquired during the actual sleep period, so it's possible for
// the transfer to be inactive when this method returns.
func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
const minWait = 5 * time.Millisecond
current := m.sLast
// sleep until the last sample time changes (ideally, just one iteration)
for m.sLast == current && m.active {
d := current + m.sRate - now
m.mu.Unlock()
if d < minWait {
d = minWait
}
time.Sleep(d)
m.mu.Lock()
now = m.update(0)
}
return now
}
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"errors"
"io"
)
// ErrLimit is returned by the Writer when a non-blocking write is short due to
// the transfer rate limit.
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")
// Limiter is implemented by the Reader and Writer to provide a consistent
// interface for monitoring and controlling data transfer.
type Limiter interface {
Done() int64
Status() Status
SetTransferSize(bytes int64)
SetLimit(new int64) (old int64)
SetBlocking(new bool) (old bool)
}
// Reader implements io.ReadCloser with a restriction on the rate of data
// transfer.
type Reader struct {
io.Reader // Data source
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be read due to the limit
}
// NewReader restricts all Read operations on r to limit bytes per second.
func NewReader(r io.Reader, limit int64) *Reader {
return &Reader{r, New(0, 0), limit, true}
}
// Read reads up to len(p) bytes into p without exceeding the current transfer
// rate limit. It returns (0, nil) immediately if r is non-blocking and no new
// bytes can be read at this time.
func (r *Reader) Read(p []byte) (n int, err error) {
p = p[:r.Limit(len(p), r.limit, r.block)]
if len(p) > 0 {
n, err = r.IO(r.Reader.Read(p))
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (r *Reader) SetLimit(new int64) (old int64) {
old, r.limit = r.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Read call on a non-blocking reader returns immediately if no additional bytes
// may be read at this time due to the rate limit.
func (r *Reader) SetBlocking(new bool) (old bool) {
old, r.block = r.block, new
return
}
// Close closes the underlying reader if it implements the io.Closer interface.
func (r *Reader) Close() error {
defer r.Done()
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
// Writer implements io.WriteCloser with a restriction on the rate of data
// transfer.
type Writer struct {
io.Writer // Data destination
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be written due to the limit
}
// NewWriter restricts all Write operations on w to limit bytes per second. The
// transfer rate and the default blocking behavior (true) can be changed
// directly on the returned *Writer.
func NewWriter(w io.Writer, limit int64) *Writer {
return &Writer{w, New(0, 0), limit, true}
}
// Write writes len(p) bytes from p to the underlying data stream without
// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is
// non-blocking and no additional bytes can be written at this time.
func (w *Writer) Write(p []byte) (n int, err error) {
var c int
for len(p) > 0 && err == nil {
s := p[:w.Limit(len(p), w.limit, w.block)]
if len(s) > 0 {
c, err = w.IO(w.Writer.Write(s))
} else {
return n, ErrLimit
}
p = p[c:]
n += c
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (w *Writer) SetLimit(new int64) (old int64) {
old, w.limit = w.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Write call on a non-blocking writer returns as soon as no additional bytes
// may be written at this time due to the rate limit.
func (w *Writer) SetBlocking(new bool) (old bool) {
old, w.block = w.block, new
return
}
// Close closes the underlying writer if it implements the io.Closer interface.
func (w *Writer) Close() error {
defer w.Done()
if c, ok := w.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"math"
"strconv"
"time"
)
// clockRate is the resolution and precision of clock().
const clockRate = 20 * time.Millisecond
// czero is the process start time rounded down to the nearest clockRate
// increment.
var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate
// clock returns a low resolution timestamp relative to the process start time.
func clock() time.Duration {
return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero
}
// clockToTime converts a clock() timestamp to an absolute time.Time value.
func clockToTime(c time.Duration) time.Time {
return time.Unix(0, int64(czero+c))
}
// clockRound returns d rounded to the nearest clockRate increment.
func clockRound(d time.Duration) time.Duration {
return (d + clockRate>>1) / clockRate * clockRate
}
// round returns x rounded to the nearest int64 (non-negative values only).
func round(x float64) int64 {
if _, frac := math.Modf(x); frac >= 0.5 {
return int64(math.Ceil(x))
}
return int64(math.Floor(x))
}
// Percent represents a percentage in increments of 1/1000th of a percent.
type Percent uint32
// percentOf calculates what percent of the total is x.
func percentOf(x, total float64) Percent {
if x < 0 || total <= 0 {
return 0
} else if p := round(x / total * 1e5); p <= math.MaxUint32 {
return Percent(p)
}
return Percent(math.MaxUint32)
}
func (p Percent) Float() float64 {
return float64(p) * 1e-3
}
func (p Percent) String() string {
var buf [12]byte
b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10)
n := len(b)
b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10)
b[n] = '.'
return string(append(b, '%'))
}
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"k8s.io/klog"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/third_party/forked/golang/netutil"
)
func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (net.Conn, error) {
dialAddr := netutil.CanonicalAddr(url)
dialer, err := utilnet.DialerFor(transport)
if err != nil {
klog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err)
}
switch url.Scheme {
case "http":
if dialer != nil {
return dialer(ctx, "tcp", dialAddr)
}
var d net.Dialer
return d.DialContext(ctx, "tcp", dialAddr)
case "https":
// Get the tls config from the transport if we recognize it
var tlsConfig *tls.Config
var tlsConn *tls.Conn
var err error
tlsConfig, err = utilnet.TLSClientConfig(transport)
if err != nil {
klog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err)
}
if dialer != nil {
// We have a dialer; use it to open the connection, then
// create a tls client using the connection.
netConn, err := dialer(ctx, "tcp", dialAddr)
if err != nil {
return nil, err
}
if tlsConfig == nil {
// tls.Client requires non-nil config
klog.Warningf("using custom dialer with no TLSClientConfig. Defaulting to InsecureSkipVerify")
// tls.Handshake() requires ServerName or InsecureSkipVerify
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
} else if len(tlsConfig.ServerName) == 0 && !tlsConfig.InsecureSkipVerify {
// tls.Handshake() requires ServerName or InsecureSkipVerify
// infer the ServerName from the hostname we're connecting to.
inferredHost := dialAddr
if host, _, err := net.SplitHostPort(dialAddr); err == nil {
inferredHost = host
}
// Make a copy to avoid polluting the provided config
tlsConfigCopy := tlsConfig.Clone()
tlsConfigCopy.ServerName = inferredHost
tlsConfig = tlsConfigCopy
}
tlsConn = tls.Client(netConn, tlsConfig)
if err := tlsConn.Handshake(); err != nil {
netConn.Close()
return nil, err
}
} else {
// Dial. This Dial method does not allow to pass a context unfortunately
tlsConn, err = tls.Dial("tcp", dialAddr, tlsConfig)
if err != nil {
return nil, err
}
}
// Return if we were configured to skip validation
if tlsConfig != nil && tlsConfig.InsecureSkipVerify {
return tlsConn, nil
}
// Verify
host, _, _ := net.SplitHostPort(dialAddr)
if tlsConfig != nil && len(tlsConfig.ServerName) > 0 {
host = tlsConfig.ServerName
}
if err := tlsConn.VerifyHostname(host); err != nil {
tlsConn.Close()
return nil, err
}
return tlsConn, nil
default:
return nil, fmt.Errorf("Unknown scheme: %s", url.Scheme)
}
}
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package proxy provides transport and upgrade support for proxies.
package proxy // import "k8s.io/apimachinery/pkg/util/proxy"
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"bytes"
"compress/flate"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"golang.org/x/net/html"
"golang.org/x/net/html/atom"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
)
// atomsToAttrs states which attributes of which tags require URL substitution.
// Sources: http://www.w3.org/TR/REC-html40/index/attributes.html
// http://www.w3.org/html/wg/drafts/html/master/index.html#attributes-1
var atomsToAttrs = map[atom.Atom]sets.String{
atom.A: sets.NewString("href"),
atom.Applet: sets.NewString("codebase"),
atom.Area: sets.NewString("href"),
atom.Audio: sets.NewString("src"),
atom.Base: sets.NewString("href"),
atom.Blockquote: sets.NewString("cite"),
atom.Body: sets.NewString("background"),
atom.Button: sets.NewString("formaction"),
atom.Command: sets.NewString("icon"),
atom.Del: sets.NewString("cite"),
atom.Embed: sets.NewString("src"),
atom.Form: sets.NewString("action"),
atom.Frame: sets.NewString("longdesc", "src"),
atom.Head: sets.NewString("profile"),
atom.Html: sets.NewString("manifest"),
atom.Iframe: sets.NewString("longdesc", "src"),
atom.Img: sets.NewString("longdesc", "src", "usemap"),
atom.Input: sets.NewString("src", "usemap", "formaction"),
atom.Ins: sets.NewString("cite"),
atom.Link: sets.NewString("href"),
atom.Object: sets.NewString("classid", "codebase", "data", "usemap"),
atom.Q: sets.NewString("cite"),
atom.Script: sets.NewString("src"),
atom.Source: sets.NewString("src"),
atom.Video: sets.NewString("poster", "src"),
// TODO: css URLs hidden in style elements.
}
// Transport is a transport for text/html content that replaces URLs in html
// content with the prefix of the proxy server
type Transport struct {
Scheme string
Host string
PathPrepend string
http.RoundTripper
}
// RoundTrip implements the http.RoundTripper interface
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
// Add reverse proxy headers.
forwardedURI := path.Join(t.PathPrepend, req.URL.Path)
if strings.HasSuffix(req.URL.Path, "/") {
forwardedURI = forwardedURI + "/"
}
req.Header.Set("X-Forwarded-Uri", forwardedURI)
if len(t.Host) > 0 {
req.Header.Set("X-Forwarded-Host", t.Host)
}
if len(t.Scheme) > 0 {
req.Header.Set("X-Forwarded-Proto", t.Scheme)
}
rt := t.RoundTripper
if rt == nil {
rt = http.DefaultTransport
}
resp, err := rt.RoundTrip(req)
if err != nil {
message := fmt.Sprintf("Error trying to reach service: '%v'", err.Error())
resp = &http.Response{
Header: http.Header{},
StatusCode: http.StatusServiceUnavailable,
Body: ioutil.NopCloser(strings.NewReader(message)),
}
resp.Header.Set("Content-Type", "text/plain; charset=utf-8")
resp.Header.Set("X-Content-Type-Options", "nosniff")
return resp, nil
}
if redirect := resp.Header.Get("Location"); redirect != "" {
resp.Header.Set("Location", t.rewriteURL(redirect, req.URL, req.Host))
return resp, nil
}
cType := resp.Header.Get("Content-Type")
cType = strings.TrimSpace(strings.SplitN(cType, ";", 2)[0])
if cType != "text/html" {
// Do nothing, simply pass through
return resp, nil
}
return t.rewriteResponse(req, resp)
}
var _ = net.RoundTripperWrapper(&Transport{})
func (rt *Transport) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
// rewriteURL rewrites a single URL to go through the proxy, if the URL refers
// to the same host as sourceURL, which is the page on which the target URL
// occurred, or if the URL matches the sourceRequestHost. If any error occurs (e.g.
// parsing), it returns targetURL.
func (t *Transport) rewriteURL(targetURL string, sourceURL *url.URL, sourceRequestHost string) string {
url, err := url.Parse(targetURL)
if err != nil {
return targetURL
}
// Example:
// When API server processes a proxy request to a service (e.g. /api/v1/namespace/foo/service/bar/proxy/),
// the sourceURL.Host (i.e. req.URL.Host) is the endpoint IP address of the service. The
// sourceRequestHost (i.e. req.Host) is the Host header that specifies the host on which the
// URL is sought, which can be different from sourceURL.Host. For example, if user sends the
// request through "kubectl proxy" locally (i.e. localhost:8001/api/v1/namespace/foo/service/bar/proxy/),
// sourceRequestHost is "localhost:8001".
//
// If the service's response URL contains non-empty host, and url.Host is equal to either sourceURL.Host
// or sourceRequestHost, we should not consider the returned URL to be a completely different host.
// It's the API server's responsibility to rewrite a same-host-and-absolute-path URL and append the
// necessary URL prefix (i.e. /api/v1/namespace/foo/service/bar/proxy/).
isDifferentHost := url.Host != "" && url.Host != sourceURL.Host && url.Host != sourceRequestHost
isRelative := !strings.HasPrefix(url.Path, "/")
if isDifferentHost || isRelative {
return targetURL
}
// Do not rewrite scheme and host if the Transport has empty scheme and host
// when targetURL already contains the sourceRequestHost
if !(url.Host == sourceRequestHost && t.Scheme == "" && t.Host == "") {
url.Scheme = t.Scheme
url.Host = t.Host
}
origPath := url.Path
// Do not rewrite URL if the sourceURL already contains the necessary prefix.
if strings.HasPrefix(url.Path, t.PathPrepend) {
return url.String()
}
url.Path = path.Join(t.PathPrepend, url.Path)
if strings.HasSuffix(origPath, "/") {
// Add back the trailing slash, which was stripped by path.Join().
url.Path += "/"
}
return url.String()
}
// rewriteHTML scans the HTML for tags with url-valued attributes, and updates
// those values with the urlRewriter function. The updated HTML is output to the
// writer.
func rewriteHTML(reader io.Reader, writer io.Writer, urlRewriter func(string) string) error {
// Note: This assumes the content is UTF-8.
tokenizer := html.NewTokenizer(reader)
var err error
for err == nil {
tokenType := tokenizer.Next()
switch tokenType {
case html.ErrorToken:
err = tokenizer.Err()
case html.StartTagToken, html.SelfClosingTagToken:
token := tokenizer.Token()
if urlAttrs, ok := atomsToAttrs[token.DataAtom]; ok {
for i, attr := range token.Attr {
if urlAttrs.Has(attr.Key) {
token.Attr[i].Val = urlRewriter(attr.Val)
}
}
}
_, err = writer.Write([]byte(token.String()))
default:
_, err = writer.Write(tokenizer.Raw())
}
}
if err != io.EOF {
return err
}
return nil
}
// rewriteResponse modifies an HTML response by updating absolute links referring
// to the original host to instead refer to the proxy transport.
func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
origBody := resp.Body
defer origBody.Close()
newContent := &bytes.Buffer{}
var reader io.Reader = origBody
var writer io.Writer = newContent
encoding := resp.Header.Get("Content-Encoding")
switch encoding {
case "gzip":
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("errorf making gzip reader: %v", err)
}
gzw := gzip.NewWriter(writer)
defer gzw.Close()
writer = gzw
case "deflate":
var err error
reader = flate.NewReader(reader)
flw, err := flate.NewWriter(writer, flate.BestCompression)
if err != nil {
return nil, fmt.Errorf("errorf making flate writer: %v", err)
}
defer func() {
flw.Close()
flw.Flush()
}()
writer = flw
case "":
// This is fine
default:
// Some encoding we don't understand-- don't try to parse this
klog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding)
return resp, nil
}
urlRewriter := func(targetUrl string) string {
return t.rewriteURL(targetUrl, req.URL, req.Host)
}
err := rewriteHTML(reader, writer, urlRewriter)
if err != nil {
klog.Errorf("Failed to rewrite URLs: %v", err)
return resp, err
}
resp.Body = ioutil.NopCloser(newContent)
// Update header node with new content-length
// TODO: Remove any hash/signature headers here?
resp.Header.Del("Content-Length")
resp.ContentLength = int64(newContent.Len())
return resp, err
}
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"github.com/mxk/go-flowrate/flowrate"
"k8s.io/klog"
)
// UpgradeRequestRoundTripper provides an additional method to decorate a request
// with any authentication or other protocol level information prior to performing
// an upgrade on the server. Any response will be handled by the intercepting
// proxy.
type UpgradeRequestRoundTripper interface {
http.RoundTripper
// WrapRequest takes a valid HTTP request and returns a suitably altered version
// of request with any HTTP level values required to complete the request half of
// an upgrade on the server. It does not get a chance to see the response and
// should bypass any request side logic that expects to see the response.
WrapRequest(*http.Request) (*http.Request, error)
}
// UpgradeAwareHandler is a handler for proxy requests that may require an upgrade
type UpgradeAwareHandler struct {
// UpgradeRequired will reject non-upgrade connections if true.
UpgradeRequired bool
// Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server
// for upgrade requests unless UseRequestLocationOnUpgrade is true.
Location *url.URL
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
Transport http.RoundTripper
// UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided.
// This allows clients to disable HTTP/2.
UpgradeTransport UpgradeRequestRoundTripper
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
WrapTransport bool
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
// following them as necessary.
InterceptRedirects bool
// RequireSameHostRedirects only allows redirects to the same host. It is only used if InterceptRedirects=true.
RequireSameHostRedirects bool
// UseRequestLocation will use the incoming request URL when talking to the backend server.
UseRequestLocation bool
// FlushInterval controls how often the standard HTTP proxy will flush content from the upstream.
FlushInterval time.Duration
// MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero.
MaxBytesPerSec int64
// Responder is passed errors that occur while setting up proxying.
Responder ErrorResponder
}
const defaultFlushInterval = 200 * time.Millisecond
// ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular
// error format.
type ErrorResponder interface {
Error(w http.ResponseWriter, req *http.Request, err error)
}
// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only
// service a single request/response per proxy.
type SimpleErrorResponder interface {
Error(err error)
}
func NewErrorResponder(r SimpleErrorResponder) ErrorResponder {
return simpleResponder{r}
}
type simpleResponder struct {
responder SimpleErrorResponder
}
func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
r.responder.Error(err)
}
// upgradeRequestRoundTripper implements proxy.UpgradeRequestRoundTripper.
type upgradeRequestRoundTripper struct {
http.RoundTripper
upgrader http.RoundTripper
}
var (
_ UpgradeRequestRoundTripper = &upgradeRequestRoundTripper{}
_ utilnet.RoundTripperWrapper = &upgradeRequestRoundTripper{}
)
// WrappedRoundTripper returns the round tripper that a caller would use.
func (rt *upgradeRequestRoundTripper) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
// WriteToRequest calls the nested upgrader and then copies the returned request
// fields onto the passed request.
func (rt *upgradeRequestRoundTripper) WrapRequest(req *http.Request) (*http.Request, error) {
resp, err := rt.upgrader.RoundTrip(req)
if err != nil {
return nil, err
}
return resp.Request, nil
}
// onewayRoundTripper captures the provided request - which is assumed to have
// been modified by other round trippers - and then returns a fake response.
type onewayRoundTripper struct{}
// RoundTrip returns a simple 200 OK response that captures the provided request.
func (onewayRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&bytes.Buffer{}),
Request: req,
}, nil
}
// MirrorRequest is a round tripper that can be called to get back the calling request as
// the core round tripper in a chain.
var MirrorRequest http.RoundTripper = onewayRoundTripper{}
// NewUpgradeRequestRoundTripper takes two round trippers - one for the underlying TCP connection, and
// one that is able to write headers to an HTTP request. The request rt is used to set the request headers
// and that is written to the underlying connection rt.
func NewUpgradeRequestRoundTripper(connection, request http.RoundTripper) UpgradeRequestRoundTripper {
return &upgradeRequestRoundTripper{
RoundTripper: connection,
upgrader: request,
}
}
// normalizeLocation returns the result of parsing the full URL, with scheme set to http if missing
func normalizeLocation(location *url.URL) *url.URL {
normalized, _ := url.Parse(location.String())
if len(normalized.Scheme) == 0 {
normalized.Scheme = "http"
}
return normalized
}
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler {
return &UpgradeAwareHandler{
Location: normalizeLocation(location),
Transport: transport,
WrapTransport: wrapTransport,
UpgradeRequired: upgradeRequired,
FlushInterval: defaultFlushInterval,
Responder: responder,
}
}
// ServeHTTP handles the proxy request
func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.tryUpgrade(w, req) {
return
}
if h.UpgradeRequired {
h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
return
}
loc := *h.Location
loc.RawQuery = req.URL.RawQuery
// If original request URL ended in '/', append a '/' at the end of the
// of the proxy URL
if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
loc.Path += "/"
}
// From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP:
// Redirect requests with an empty path to a location that ends with a '/'
// This is essentially a hack for http://issue.k8s.io/4958.
// Note: Keep this code after tryUpgrade to not break that flow.
if len(loc.Path) == 0 {
var queryPart string
if len(req.URL.RawQuery) > 0 {
queryPart = "?" + req.URL.RawQuery
}
w.Header().Set("Location", req.URL.Path+"/"+queryPart)
w.WriteHeader(http.StatusMovedPermanently)
return
}
if h.Transport == nil || h.WrapTransport {
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
}
// WithContext creates a shallow clone of the request with the same context.
newReq := req.WithContext(req.Context())
newReq.Header = utilnet.CloneHeader(req.Header)
if !h.UseRequestLocation {
newReq.URL = &loc
}
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host})
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
proxy.ServeHTTP(w, newReq)
}
type noSuppressPanicError struct{}
func (noSuppressPanicError) Write(p []byte) (n int, err error) {
// skip "suppressing panic for copyResponse error in test; copy error" error message
// that ends up in CI tests on each kube-apiserver termination as noise and
// everybody thinks this is fatal.
if strings.Contains(string(p), "suppressing panic") {
return len(p), nil
}
return os.Stderr.Write(p)
}
// tryUpgrade returns true if the request was handled.
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
if !httpstream.IsUpgradeRequest(req) {
klog.V(6).Infof("Request was not an upgrade")
return false
}
var (
backendConn net.Conn
rawResponse []byte
err error
)
location := *h.Location
if h.UseRequestLocation {
location = *req.URL
location.Scheme = h.Location.Scheme
location.Host = h.Location.Host
}
clone := utilnet.CloneRequest(req)
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
// handles this in the non-upgrade path.
utilnet.AppendForwardedForHeader(clone)
if h.InterceptRedirects {
klog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header)
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade), h.RequireSameHostRedirects)
} else {
klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
clone.URL = &location
backendConn, err = h.DialForUpgrade(clone)
}
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
h.Responder.Error(w, req, err)
return true
}
defer backendConn.Close()
// determine the http response code from the backend by reading from rawResponse+backendConn
backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
h.Responder.Error(w, req, err)
return true
}
if len(headerBytes) > len(rawResponse) {
// we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend
rawResponse = headerBytes
}
// Once the connection is hijacked, the ErrorResponder will no longer work, so
// hijacking should be the last step in the upgrade.
requestHijacker, ok := w.(http.Hijacker)
if !ok {
klog.V(6).Infof("Unable to hijack response writer: %T", w)
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
return true
}
requestHijackedConn, _, err := requestHijacker.Hijack()
if err != nil {
klog.V(6).Infof("Unable to hijack response: %v", err)
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
return true
}
defer requestHijackedConn.Close()
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
// If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
// set read/write deadlines
deadline := time.Now().Add(10 * time.Second)
backendConn.SetReadDeadline(deadline)
requestHijackedConn.SetWriteDeadline(deadline)
// write the response to the client
err := backendHTTPResponse.Write(requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from backend to client: %v", err)
}
// Indicate we handled the request
return true
}
// Forward raw response bytes back to client.
if len(rawResponse) > 0 {
klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
}
}
// Proxy the connection. This is bidirectional, so we need a goroutine
// to copy in each direction. Once one side of the connection exits, we
// exit the function which performs cleanup and in the process closes
// the other half of the connection in the defer.
writerComplete := make(chan struct{})
readerComplete := make(chan struct{})
go func() {
var writer io.WriteCloser
if h.MaxBytesPerSec > 0 {
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
} else {
writer = backendConn
}
_, err := io.Copy(writer, requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from client to backend: %v", err)
}
close(writerComplete)
}()
go func() {
var reader io.ReadCloser
if h.MaxBytesPerSec > 0 {
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
} else {
reader = backendConn
}
_, err := io.Copy(requestHijackedConn, reader)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from backend to client: %v", err)
}
close(readerComplete)
}()
// Wait for one half the connection to exit. Once it does the defer will
// clean up the other half of the connection.
select {
case <-writerComplete:
case <-readerComplete:
}
klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)
return true
}
func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) {
return dial(req, h.Transport)
}
func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
if h.UpgradeTransport == nil {
return dial(req, h.Transport)
}
updatedReq, err := h.UpgradeTransport.WrapRequest(req)
if err != nil {
return nil, err
}
return dial(updatedReq, h.UpgradeTransport)
}
// getResponseCode reads a http response from the given reader, returns the response,
// the bytes read from the reader, and any error encountered
func getResponse(r io.Reader) (*http.Response, []byte, error) {
rawResponse := bytes.NewBuffer(make([]byte, 0, 256))
// Save the bytes read while reading the response headers into the rawResponse buffer
resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil)
if err != nil {
return nil, nil, err
}
// return the http response and the raw bytes consumed from the reader in the process
return resp, rawResponse.Bytes(), nil
}
// dial dials the backend at req.URL and writes req to it.
func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) {
conn, err := DialURL(req.Context(), req.URL, transport)
if err != nil {
return nil, fmt.Errorf("error dialing backend: %v", err)
}
if err = req.Write(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("error sending request: %v", err)
}
return conn, err
}
var _ utilnet.Dialer = &UpgradeAwareHandler{}
func (h *UpgradeAwareHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
scheme := url.Scheme
host := url.Host
suffix := h.Location.Path
if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") {
suffix += "/"
}
pathPrepend := strings.TrimSuffix(url.Path, suffix)
rewritingTransport := &Transport{
Scheme: scheme,
Host: host,
PathPrepend: pathPrepend,
RoundTripper: internalTransport,
}
return &corsRemovingTransport{
RoundTripper: rewritingTransport,
}
}
// corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
// from the internal response.
// Implements pkg/util/net.RoundTripperWrapper
type corsRemovingTransport struct {
http.RoundTripper
}
var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})
func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := rt.RoundTripper.RoundTrip(req)
if err != nil {
return nil, err
}
removeCORSHeaders(resp)
return resp, nil
}
func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
// removeCORSHeaders strip CORS headers sent from the backend
// This should be called on all responses before returning
func removeCORSHeaders(resp *http.Response) {
resp.Header.Del("Access-Control-Allow-Credentials")
resp.Header.Del("Access-Control-Allow-Headers")
resp.Header.Del("Access-Control-Allow-Methods")
resp.Header.Del("Access-Control-Allow-Origin")
}
......@@ -482,6 +482,8 @@ github.com/modern-go/concurrent
github.com/modern-go/reflect2
# github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d => github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d
github.com/munnerz/goautoneg
# github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f => github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f
github.com/mxk/go-flowrate/flowrate
# github.com/naoina/go-stringutil v0.1.0 => github.com/naoina/go-stringutil v0.1.0
github.com/naoina/go-stringutil
# github.com/naoina/toml v0.1.1 => github.com/naoina/toml v0.1.1
......@@ -999,6 +1001,7 @@ k8s.io/apimachinery/pkg/util/json
k8s.io/apimachinery/pkg/util/mergepatch
k8s.io/apimachinery/pkg/util/naming
k8s.io/apimachinery/pkg/util/net
k8s.io/apimachinery/pkg/util/proxy
k8s.io/apimachinery/pkg/util/rand
k8s.io/apimachinery/pkg/util/remotecommand
k8s.io/apimachinery/pkg/util/runtime
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册