diff --git a/cmd/ks-apiserver/app/options/options.go b/cmd/ks-apiserver/app/options/options.go index cce118bc0d6ad637f598b3505285046ee28eb2bd..4b2eabbd3229a9402cf7796caa5cb2bc0982b6ab 100644 --- a/cmd/ks-apiserver/app/options/options.go +++ b/cmd/ks-apiserver/app/options/options.go @@ -11,6 +11,7 @@ import ( apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config" "kubesphere.io/kubesphere/pkg/informers" genericoptions "kubesphere.io/kubesphere/pkg/server/options" + auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" @@ -56,6 +57,7 @@ func NewServerRunOptions() *ServerRunOptions { AuthenticationOptions: authoptions.NewAuthenticateOptions(), MultiClusterOptions: multicluster.NewOptions(), EventsOptions: eventsclient.NewElasticSearchOptions(), + AuditingOptions: auditingclient.NewElasticSearchOptions(), }, } @@ -81,6 +83,7 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { s.LoggingOptions.AddFlags(fss.FlagSet("logging"), s.LoggingOptions) s.MultiClusterOptions.AddFlags(fss.FlagSet("multicluster"), s.MultiClusterOptions) s.EventsOptions.AddFlags(fss.FlagSet("events"), s.EventsOptions) + s.AuditingOptions.AddFlags(fss.FlagSet("auditing"), s.AuditingOptions) fs = fss.FlagSet("klog") local := flag.NewFlagSet("klog", flag.ExitOnError) @@ -188,6 +191,14 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS apiServer.EventsClient = eventsClient } + if s.AuditingOptions.Host != "" { + auditingClient, err := auditingclient.NewClient(s.AuditingOptions) + if err != nil { + return nil, err + } + apiServer.AuditingClient = auditingClient + } + if s.OpenPitrixOptions != nil { opClient, err := openpitrix.NewClient(s.OpenPitrixOptions) if err != nil { diff --git a/cmd/ks-apiserver/app/options/validation.go b/cmd/ks-apiserver/app/options/validation.go index 6c620d3ee377e1f096275d618916e825593f13aa..648ddcf17a56f1776a60ffe94184ae92b4c6535d 100644 --- a/cmd/ks-apiserver/app/options/validation.go +++ b/cmd/ks-apiserver/app/options/validation.go @@ -17,6 +17,7 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, s.LoggingOptions.Validate()...) errors = append(errors, s.AuthorizationOptions.Validate()...) errors = append(errors, s.EventsOptions.Validate()...) + errors = append(errors, s.AuditingOptions.Validate()...) return errors } diff --git a/pkg/api/auditing/v1alpha1/types.go b/pkg/api/auditing/v1alpha1/types.go new file mode 100644 index 0000000000000000000000000000000000000000..6d8357ea3fd09705220c0a4b940912e4f2f70da5 --- /dev/null +++ b/pkg/api/auditing/v1alpha1/types.go @@ -0,0 +1,108 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "github.com/emicklei/go-restful" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" + "strconv" + "time" +) + +type APIResponse struct { + Events *auditing.Events `json:"query,omitempty" description:"query results"` + Statistics *auditing.Statistics `json:"statistics,omitempty" description:"statistics results"` + Histogram *auditing.Histogram `json:"histogram,omitempty" description:"histogram results"` +} + +type Query struct { + Operation string `json:"operation,omitempty"` + WorkspaceFilter string `json:"workspace_filter,omitempty"` + WorkspaceSearch string `json:"workspace_search,omitempty"` + ObjectRefNamespaceFilter string `json:"objectref_namespace_filter,omitempty"` + ObjectRefNamespaceSearch string `json:"objectref_namespace_search,omitempty"` + ObjectRefNameFilter string `json:"objectref_name_filter,omitempty"` + ObjectRefNameSearch string `json:"objectref_name_search,omitempty"` + LevelFilter string `json:"level_filter,omitempty"` + VerbFilter string `json:"verb_filter,omitempty"` + UserFilter string `json:"user_filter,omitempty"` + UserSearch string `json:"user_search,omitempty"` + GroupSearch string `json:"group_search,omitempty"` + SourceIpSearch string `json:"source_ip_search,omitempty"` + ObjectRefResourceFilter string `json:"objectref_resource_filter,omitempty"` + ObjectRefSubresourceFilter string `json:"objectref_subresource_filter,omitempty"` + ResponesStatusFilter string `json:"response_status_filter,omitempty"` + + StartTime *time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` + + Interval string `json:"interval,omitempty"` + Sort string `json:"sort,omitempty"` + From int64 `json:"from,omitempty"` + Size int64 `json:"size,omitempty"` +} + +func ParseQueryParameter(req *restful.Request) (*Query, error) { + q := &Query{} + + q.Operation = req.QueryParameter("operation") + q.WorkspaceFilter = req.QueryParameter("workspace_filter") + q.WorkspaceSearch = req.QueryParameter("workspace_search") + q.ObjectRefNamespaceFilter = req.QueryParameter("objectref_namespace_filter") + q.ObjectRefNamespaceSearch = req.QueryParameter("objectref_namespace_search") + q.ObjectRefNameFilter = req.QueryParameter("objectref_name_filter") + q.ObjectRefNameSearch = req.QueryParameter("objectref_name_search") + q.LevelFilter = req.QueryParameter("level_filter") + q.VerbFilter = req.QueryParameter("verb_filter") + q.SourceIpSearch = req.QueryParameter("source_ip_search") + q.UserFilter = req.QueryParameter("user_filter") + q.UserSearch = req.QueryParameter("user_search") + q.GroupSearch = req.QueryParameter("group_search") + q.ObjectRefResourceFilter = req.QueryParameter("objectref_resource_filter") + q.ObjectRefSubresourceFilter = req.QueryParameter("objectref_subresource_filter") + q.ResponesStatusFilter = req.QueryParameter("response_status_filter") + + if tstr := req.QueryParameter("start_time"); tstr != "" { + sec, err := strconv.ParseInt(tstr, 10, 64) + if err != nil { + return nil, err + } + t := time.Unix(sec, 0) + q.StartTime = &t + } + if tstr := req.QueryParameter("end_time"); tstr != "" { + sec, err := strconv.ParseInt(tstr, 10, 64) + if err != nil { + return nil, err + } + t := time.Unix(sec, 0) + q.EndTime = &t + } + if q.Interval = req.QueryParameter("interval"); q.Interval == "" { + q.Interval = "15m" + } + q.From, _ = strconv.ParseInt(req.QueryParameter("from"), 10, 64) + size, err := strconv.ParseInt(req.QueryParameter("size"), 10, 64) + if err != nil { + size = 10 + } + q.Size = size + if q.Sort = req.QueryParameter("sort"); q.Sort != "asc" { + q.Sort = "desc" + } + return q, nil +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 4cc56489b2b0c11d97f6100cbecf213838eea217..2498f65a9ba1db41bfcfc18225769b0d44cad8d9 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -63,6 +63,7 @@ import ( "kubesphere.io/kubesphere/pkg/kapis/version" "kubesphere.io/kubesphere/pkg/models/iam/am" "kubesphere.io/kubesphere/pkg/models/iam/im" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops" "kubesphere.io/kubesphere/pkg/simple/client/events" @@ -135,6 +136,8 @@ type APIServer struct { SonarClient sonarqube.SonarInterface EventsClient events.Client + + AuditingClient auditing.Client } func (s *APIServer) PrepareRun() error { @@ -172,7 +175,7 @@ func (s *APIServer) installKubeSphereAPIs() { urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory, s.KubernetesClient.Master())) urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(), - s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient)) + s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient)) urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config())) urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container, s.InformerFactory.KubernetesSharedInformerFactory(), diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index d0f717e2ebd0d05330a9af4013a23f4c163f69b9..e11746a708946620c74c030f80642a7bee511b2b 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -22,6 +22,7 @@ import ( authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options" authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options" "kubesphere.io/kubesphere/pkg/simple/client/alerting" + auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" @@ -92,6 +93,7 @@ type Config struct { AuthorizationOptions *authorizationoptions.AuthorizationOptions `json:"authorization,omitempty" yaml:"authorization,omitempty" mapstructure:"authorization"` MultiClusterOptions *multicluster.Options `json:"multicluster,omitempty" yaml:"multicluster,omitempty" mapstructure:"multicluster"` EventsOptions *eventsclient.Options `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"` + AuditingOptions *auditingclient.Options `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"` // Options used for enabling components, not actually used now. Once we switch Alerting/Notification API to kubesphere, // we can add these options to kubesphere command lines AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"` @@ -118,6 +120,7 @@ func New() *Config { AuthorizationOptions: authorizationoptions.NewAuthorizationOptions(), MultiClusterOptions: multicluster.NewOptions(), EventsOptions: eventsclient.NewElasticSearchOptions(), + AuditingOptions: auditingclient.NewElasticSearchOptions(), } } @@ -235,4 +238,8 @@ func (conf *Config) stripEmptyOptions() { if conf.EventsOptions != nil && conf.EventsOptions.Host == "" { conf.EventsOptions = nil } + + if conf.AuditingOptions != nil && conf.AuditingOptions.Host == "" { + conf.AuditingOptions = nil + } } diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index a414a2b339d6c62040c9b19e723ccc99a759439b..566f90d52826db4307329576945fc074f085cbdc 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -25,6 +25,7 @@ import ( authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options" authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options" "kubesphere.io/kubesphere/pkg/simple/client/alerting" + auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" @@ -145,6 +146,11 @@ func newTestConfig() (*Config, error) { IndexPrefix: "ks-logstash-events", Version: "6", }, + AuditingOptions: &auditingclient.Options{ + Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200", + IndexPrefix: "ks-logstash-auditing", + Version: "6", + }, } return conf, nil } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index b9a65069c728d6384fc268c293dcac65c901ecb8..b09a772989cb344c72dbda601f5d6ad68cc1cb62 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -83,6 +83,7 @@ const ( LogQueryTag = "Log Query" TerminalTag = "Terminal" EventsQueryTag = "Events Query" + AuditingQueryTag = "Auditing Query" ) var ( diff --git a/pkg/kapis/tenant/v1alpha2/handler.go b/pkg/kapis/tenant/v1alpha2/handler.go index e267eb9bed2e338ad197d351debfeddc23ea5b15..927639055ad42993a7e0989174f26d56b003fcf8 100644 --- a/pkg/kapis/tenant/v1alpha2/handler.go +++ b/pkg/kapis/tenant/v1alpha2/handler.go @@ -8,6 +8,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api" + auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1" eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1" loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2" @@ -17,6 +18,7 @@ import ( "kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/models/tenant" servererr "kubesphere.io/kubesphere/pkg/server/errors" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" "kubesphere.io/kubesphere/pkg/simple/client/events" "kubesphere.io/kubesphere/pkg/simple/client/logging" ) @@ -25,10 +27,10 @@ type tenantHandler struct { tenant tenant.Interface } -func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) *tenantHandler { +func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) *tenantHandler { return &tenantHandler{ - tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient), + tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient), } } @@ -282,3 +284,29 @@ func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) resp.WriteAsJson(result) } } + +func (h *tenantHandler) Auditing(req *restful.Request, resp *restful.Response) { + user, ok := request.UserFrom(req.Request.Context()) + if !ok { + err := fmt.Errorf("cannot obtain user info") + klog.Errorln(err) + api.HandleForbidden(resp, req, err) + return + } + queryParam, err := auditingv1alpha1.ParseQueryParameter(req) + if err != nil { + klog.Errorln(err) + api.HandleInternalError(resp, req, err) + return + } + + result, err := h.tenant.Auditing(user, queryParam) + if err != nil { + klog.Errorln(err) + api.HandleInternalError(resp, req, err) + return + } + + _ = resp.WriteEntity(result) + +} diff --git a/pkg/kapis/tenant/v1alpha2/register.go b/pkg/kapis/tenant/v1alpha2/register.go index 4a924abb59bbb92cce42d0cad08f17bfe070a154..9a3c9d02093037617bbdad7923dc958d0c463e68 100644 --- a/pkg/kapis/tenant/v1alpha2/register.go +++ b/pkg/kapis/tenant/v1alpha2/register.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "kubesphere.io/kubesphere/pkg/api" + auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1" eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1" loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2" @@ -32,6 +33,7 @@ import ( "kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/server/errors" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" "kubesphere.io/kubesphere/pkg/simple/client/events" "kubesphere.io/kubesphere/pkg/simple/client/logging" "net/http" @@ -43,9 +45,9 @@ const ( var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"} -func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) error { +func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) error { ws := runtime.NewWebService(GroupVersion) - handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient) + handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient) ws.Route(ws.POST("/workspaces"). To(handler.CreateWorkspace). @@ -146,6 +148,35 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON, "text/plain") + ws.Route(ws.GET("/auditing/events"). + To(handler.Auditing). + Doc("Query auditing events against the cluster"). + Param(ws.QueryParameter("operation", "Operation type. This can be one of three types: `query` (for querying events), `statistics` (for retrieving statistical data), `histogram` (for displaying events count by time interval). Defaults to query.").DefaultValue("query")). + Param(ws.QueryParameter("workspace_filter", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`.")). + Param(ws.QueryParameter("workspace_search", "A comma-separated list of keywords. Differing from **workspace_filter**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")). + Param(ws.QueryParameter("objectref_namespace_filter", "A comma-separated list of namespaces. This field restricts the query to specified `ObjectRef.Namespace`.")). + Param(ws.QueryParameter("objectref_namespace_search", "A comma-separated list of keywords. Differing from **objectref_namespace_filter**, this field performs fuzzy matching on `ObjectRef.Namespace`.")). + Param(ws.QueryParameter("objectref_name_filter", "A comma-separated list of names. This field restricts the query to specified `ObjectRef.Name`.")). + Param(ws.QueryParameter("objectref_name_search", "A comma-separated list of keywords. Differing from **objectref_name_filter**, this field performs fuzzy matching on `ObjectRef.Name`.")). + Param(ws.QueryParameter("level_filter", "A comma-separated list of levels. This know values are Metadata, Request, RequestResponse.")). + Param(ws.QueryParameter("verb_filter", "A comma-separated list of verbs. This field restricts the query to specified verb. This field restricts the query to specified `Verb`.")). + Param(ws.QueryParameter("user_filter", "A comma-separated list of user. This field restricts the query to specified user. For example, the following filter matches the user user1 and user2: `user1,user2`.")). + Param(ws.QueryParameter("user_search", "A comma-separated list of keywords. Differing from **user_filter**, this field performs fuzzy matching on 'User.username'. For example, the following value limits the query to user whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")). + Param(ws.QueryParameter("group_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'User.Groups'. For example, the following value limits the query to group which contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")). + Param(ws.QueryParameter("source_ip_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'SourceIPs'. For example, the following value limits the query to SourceIPs which contains 127.0 *OR* 192.168.: `127.0,192.168.`.")). + Param(ws.QueryParameter("objectref_resource_filter", "A comma-separated list of resource. This field restricts the query to specified ip. This field restricts the query to specified `ObjectRef.Resource`.")). + Param(ws.QueryParameter("objectref_subresource_filter", "A comma-separated list of subresource. This field restricts the query to specified subresource. This field restricts the query to specified `ObjectRef.Subresource`.")). + Param(ws.QueryParameter("response_status_filter", "A comma-separated list of response status code. This field restricts the query to specified response status code. This field restricts the query to specified `ResponseStatus.Code`.")). + Param(ws.QueryParameter("start_time", "Start time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")). + Param(ws.QueryParameter("end_time", "End time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")). + Param(ws.QueryParameter("interval", "Time interval. It requires **operation** is set to `histogram`. The format is [0-9]+[smhdwMqy]. Defaults to 15m (i.e. 15 min).").DefaultValue("15m")). + Param(ws.QueryParameter("sort", "Sort order. One of asc, desc. This field sorts events by `RequestReceivedTimestamp`.").DataType("string").DefaultValue("desc")). + Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to `query`. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). + Param(ws.QueryParameter("size", "Size of result set to return. It requires **operation** is set to `query`. Defaults to 10 (i.e. 10 event records).").DataType("integer").DefaultValue("10").Required(false)). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.AuditingQueryTag}). + Writes(auditingv1alpha1.APIResponse{}). + Returns(http.StatusOK, api.StatusOK, auditingv1alpha1.APIResponse{})) + c.Add(ws) return nil } diff --git a/pkg/models/auditing/events.go b/pkg/models/auditing/events.go new file mode 100644 index 0000000000000000000000000000000000000000..1088b750f5ef81c2237d4bf5e0ba4d6770d1aa53 --- /dev/null +++ b/pkg/models/auditing/events.go @@ -0,0 +1,94 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package auditing + +import ( + "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" + "kubesphere.io/kubesphere/pkg/utils/stringutils" + "strconv" +) + +type Interface interface { + Events(queryParam *v1alpha1.Query, MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error) +} + +type eventsOperator struct { + client auditing.Client +} + +func NewEventsOperator(client auditing.Client) Interface { + return &eventsOperator{client} +} + +func (eo *eventsOperator) Events(queryParam *v1alpha1.Query, + MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error) { + filter := &auditing.Filter{ + ObjectRefNames: stringutils.Split(queryParam.ObjectRefNameFilter, ","), + ObjectRefNameFuzzy: stringutils.Split(queryParam.ObjectRefNameSearch, ","), + Levels: stringutils.Split(queryParam.LevelFilter, ","), + Verbs: stringutils.Split(queryParam.VerbFilter, ","), + Users: stringutils.Split(queryParam.UserFilter, ","), + UserFuzzy: stringutils.Split(queryParam.UserSearch, ","), + GroupFuzzy: stringutils.Split(queryParam.GroupSearch, ","), + SourceIpFuzzy: stringutils.Split(queryParam.SourceIpSearch, ","), + ObjectRefResources: stringutils.Split(queryParam.ObjectRefResourceFilter, ","), + ObjectRefSubresources: stringutils.Split(queryParam.ObjectRefSubresourceFilter, ","), + StartTime: queryParam.StartTime, + EndTime: queryParam.EndTime, + } + if MutateFilterFunc != nil { + MutateFilterFunc(filter) + } + + cs := stringutils.Split(queryParam.ResponesStatusFilter, ",") + for _, c := range cs { + code, err := strconv.ParseInt(c, 10, 64) + if err != nil { + continue + } + + filter.ResponseStatus = append(filter.ResponseStatus, int32(code)) + } + + var ar v1alpha1.APIResponse + var err error + switch queryParam.Operation { + case "histogram": + if len(filter.ObjectRefNamespaceMap) == 0 { + ar.Histogram = &auditing.Histogram{} + } else { + ar.Histogram, err = eo.client.CountOverTime(filter, queryParam.Interval) + } + case "statistics": + if len(filter.ObjectRefNamespaceMap) == 0 { + ar.Statistics = &auditing.Statistics{} + } else { + ar.Statistics, err = eo.client.StatisticsOnResources(filter) + } + default: + if len(filter.ObjectRefNamespaceMap) == 0 { + ar.Events = &auditing.Events{} + } else { + ar.Events, err = eo.client.SearchAuditingEvent(filter, queryParam.From, queryParam.Size, queryParam.Sort) + } + } + if err != nil { + return nil, err + } + return &ar, nil +} diff --git a/pkg/models/tenant/tenant.go b/pkg/models/tenant/tenant.go index 9303e171725e42a5460adce68273effb8bc2b02f..f2d6f4ce081f7cf618d59c75834533123a943135 100644 --- a/pkg/models/tenant/tenant.go +++ b/pkg/models/tenant/tenant.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api" + auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1" eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1" loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1" @@ -37,11 +38,13 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/query" kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" "kubesphere.io/kubesphere/pkg/informers" + "kubesphere.io/kubesphere/pkg/models/auditing" "kubesphere.io/kubesphere/pkg/models/events" "kubesphere.io/kubesphere/pkg/models/iam/am" "kubesphere.io/kubesphere/pkg/models/logging" resources "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3" resourcesv1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource" + auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing" eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events" loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging" "kubesphere.io/kubesphere/pkg/utils/stringutils" @@ -62,6 +65,7 @@ type Interface interface { Events(user user.Info, queryParam *eventsv1alpha1.Query) (*eventsv1alpha1.APIResponse, error) QueryLogs(user user.Info, query *loggingv1alpha2.Query) (*loggingv1alpha2.APIResponse, error) ExportLogs(user user.Info, query *loggingv1alpha2.Query, writer io.Writer) error + Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error) } type tenantOperator struct { @@ -72,9 +76,10 @@ type tenantOperator struct { resourceGetter *resourcesv1alpha3.ResourceGetter events events.Interface lo logging.LoggingOperator + auditing auditing.Interface } -func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface) Interface { +func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface, auditingclient auditingclient.Client) Interface { amOperator := am.NewReadOnlyOperator(informers) authorizer := authorizerfactory.NewRBACAuthorizer(amOperator) return &tenantOperator{ @@ -85,6 +90,7 @@ func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ks ksclient: ksclient, events: events.NewEventsOperator(evtsClient), lo: logging.NewLoggingOperator(loggingClient), + auditing: auditing.NewEventsOperator(auditingclient), } } @@ -523,6 +529,48 @@ func (t *tenantOperator) ExportLogs(user user.Info, query *loggingv1alpha2.Query } } +func (t *tenantOperator) Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error) { + iNamespaces, err := t.listIntersectedNamespaces(user, + stringutils.Split(queryParam.WorkspaceFilter, ","), + stringutils.Split(queryParam.WorkspaceSearch, ","), + stringutils.Split(queryParam.ObjectRefNamespaceFilter, ","), + stringutils.Split(queryParam.ObjectRefNamespaceSearch, ",")) + if err != nil { + klog.Error(err) + return nil, err + } + + namespaceCreateTimeMap := make(map[string]time.Time) + for _, ns := range iNamespaces { + namespaceCreateTimeMap[ns.Name] = ns.CreationTimestamp.Time + } + // If there are no ns and ws query conditions, + // those events with empty `ObjectRef.Namespace` will also be listed when user can list all namespaces + if len(queryParam.WorkspaceFilter) == 0 && len(queryParam.ObjectRefNamespaceFilter) == 0 && + len(queryParam.WorkspaceSearch) == 0 && len(queryParam.ObjectRefNamespaceSearch) == 0 { + listEvts := authorizer.AttributesRecord{ + User: user, + Verb: "list", + APIGroup: "", + APIVersion: "v1", + Resource: "namespaces", + ResourceRequest: true, + } + decision, _, err := t.authorizer.Authorize(listEvts) + if err != nil { + klog.Error(err) + return nil, err + } + if decision == authorizer.DecisionAllow { + namespaceCreateTimeMap[""] = time.Time{} + } + } + + return t.auditing.Events(queryParam, func(filter *auditingclient.Filter) { + filter.ObjectRefNamespaceMap = namespaceCreateTimeMap + }) +} + func contains(objects []runtime.Object, object runtime.Object) bool { for _, item := range objects { if item == object { diff --git a/pkg/models/tenant/tenent_test.go b/pkg/models/tenant/tenent_test.go index 765c437b387f273d757769fe43b450ef73ea35d9..4018ed14e674fe3a8e880c20ebebc478690d9757 100644 --- a/pkg/models/tenant/tenent_test.go +++ b/pkg/models/tenant/tenent_test.go @@ -328,5 +328,5 @@ func prepare() Interface { RoleBindings().Informer().GetIndexer().Add(roleBinding) } - return New(fakeInformerFactory, nil, nil, nil, nil) + return New(fakeInformerFactory, nil, nil, nil, nil, nil) } diff --git a/pkg/simple/client/auditing/elasticsearch/clients.go b/pkg/simple/client/auditing/elasticsearch/clients.go new file mode 100644 index 0000000000000000000000000000000000000000..e80fee34ce18d6dca973454fee947401fccf530c --- /dev/null +++ b/pkg/simple/client/auditing/elasticsearch/clients.go @@ -0,0 +1,171 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package elasticsearch + +import ( + "fmt" + es5 "github.com/elastic/go-elasticsearch/v5" + es5api "github.com/elastic/go-elasticsearch/v5/esapi" + es6 "github.com/elastic/go-elasticsearch/v6" + es6api "github.com/elastic/go-elasticsearch/v6/esapi" + es7 "github.com/elastic/go-elasticsearch/v7" + es7api "github.com/elastic/go-elasticsearch/v7/esapi" + jsoniter "github.com/json-iterator/go" + "io" + "net/http" +) + +type Request struct { + Index string + Body io.Reader +} + +type Response struct { + Hits Hits `json:"hits"` + Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` +} + +type Hits struct { + Total int64 `json:"total"` + Hits jsoniter.RawMessage `json:"hits"` +} + +type Error struct { + Type string `json:"type"` + Reason string `json:"reason"` + Status int `json:"status"` +} + +func (e Error) Error() string { + return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason) +} + +type ClientV5 es5.Client + +func (c *ClientV5) ExSearch(r *Request) (*Response, error) { + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) +} +func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) { + if err != nil { + return nil, fmt.Errorf("error getting response: %s", err) + } + defer func() { + _ = resp.Body.Close() + }() + if resp.IsError() { + return nil, fmt.Errorf(resp.String()) + } + var r struct { + Hits struct { + Total int64 `json:"total"` + Hits jsoniter.RawMessage `json:"hits"` + } `json:"hits"` + Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, fmt.Errorf("error parsing the response body: %s", err) + } + return &Response{ + Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, + Aggregations: r.Aggregations, + }, nil +} +func (c *ClientV5) Version() (string, error) { + resp, err := c.Info() + if err != nil { + return "", err + } + defer func() { + _ = resp.Body.Close() + }() + if resp.IsError() { + return "", fmt.Errorf(resp.String()) + } + var r map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return "", fmt.Errorf("error parsing the response body: %s", err) + } + return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil +} + +type ClientV6 es6.Client + +func (c *ClientV6) ExSearch(r *Request) (*Response, error) { + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) +} +func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) { + if err != nil { + return nil, fmt.Errorf("error getting response: %s", err) + } + defer func() { + _ = resp.Body.Close() + }() + if resp.IsError() { + return nil, fmt.Errorf(resp.String()) + } + var r struct { + Hits *struct { + Total int64 `json:"total"` + Hits jsoniter.RawMessage `json:"hits"` + } `json:"hits"` + Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, fmt.Errorf("error parsing the response body: %s", err) + } + return &Response{ + Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, + Aggregations: r.Aggregations, + }, nil +} + +type ClientV7 es7.Client + +func (c *ClientV7) ExSearch(r *Request) (*Response, error) { + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) +} +func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) { + if err != nil { + return nil, fmt.Errorf("error getting response: %s", err) + } + defer func() { + _ = resp.Body.Close() + }() + if resp.IsError() { + return nil, fmt.Errorf(resp.String()) + } + var r struct { + Hits *struct { + Total struct { + Value int64 `json:"value"` + } `json:"total"` + Hits jsoniter.RawMessage `json:"hits"` + } `json:"hits"` + Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, fmt.Errorf("error parsing the response body: %s", err) + } + return &Response{ + Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits}, + Aggregations: r.Aggregations, + }, nil +} + +type client interface { + ExSearch(r *Request) (*Response, error) +} diff --git a/pkg/simple/client/auditing/elasticsearch/elasticsearch.go b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go new file mode 100644 index 0000000000000000000000000000000000000000..74dbdbf6b22d0cc793b57304638fafcd04a05650 --- /dev/null +++ b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go @@ -0,0 +1,391 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package elasticsearch + +import ( + "bytes" + "fmt" + "strings" + "time" + + es5 "github.com/elastic/go-elasticsearch/v5" + es6 "github.com/elastic/go-elasticsearch/v6" + es7 "github.com/elastic/go-elasticsearch/v7" + jsoniter "github.com/json-iterator/go" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +type Elasticsearch struct { + c client + opts struct { + index string + } +} + +func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size int64, + sort string) (*auditing.Events, error) { + queryPart := parseToQueryPart(filter) + if sort == "" { + sort = "desc" + } + sortPart := []map[string]interface{}{{ + "RequestReceivedTimestamp": map[string]string{"order": sort}, + }} + b := map[string]interface{}{ + "from": from, + "size": size, + "query": queryPart, + "sort": sortPart, + } + + body, err := json.Marshal(b) + if err != nil { + return nil, err + } + resp, err := es.c.ExSearch(&Request{ + Index: es.opts.index, + Body: bytes.NewBuffer(body), + }) + if err != nil || resp == nil { + return nil, err + } + + var innerHits []struct { + *auditing.Event `json:"_source"` + } + if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil { + return nil, err + } + evts := auditing.Events{Total: resp.Hits.Total} + for _, hit := range innerHits { + evts.Records = append(evts.Records, hit.Event) + } + return &evts, nil +} + +func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) { + if interval == "" { + interval = "15m" + } + + queryPart := parseToQueryPart(filter) + aggName := "events_count_over_timestamp" + aggsPart := map[string]interface{}{ + aggName: map[string]interface{}{ + "date_histogram": map[string]string{ + "field": "RequestReceivedTimestamp", + "interval": interval, + }, + }, + } + b := map[string]interface{}{ + "query": queryPart, + "aggs": aggsPart, + "size": 0, // do not get docs + } + + body, err := json.Marshal(b) + if err != nil { + return nil, err + } + resp, err := es.c.ExSearch(&Request{ + Index: es.opts.index, + Body: bytes.NewBuffer(body), + }) + if err != nil || resp == nil { + return nil, err + } + + raw := resp.Aggregations[aggName] + var agg struct { + Buckets []struct { + KeyAsString string `json:"key_as_string"` + Key int64 `json:"key"` + DocCount int64 `json:"doc_count"` + } `json:"buckets"` + } + if err := json.Unmarshal(raw, &agg); err != nil { + return nil, err + } + histo := auditing.Histogram{Total: int64(len(agg.Buckets))} + for _, b := range agg.Buckets { + histo.Buckets = append(histo.Buckets, + auditing.Bucket{Time: b.Key, Count: b.DocCount}) + } + return &histo, nil +} + +func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) { + queryPart := parseToQueryPart(filter) + aggName := "resources_count" + aggsPart := map[string]interface{}{ + aggName: map[string]interface{}{ + "cardinality": map[string]string{ + "field": "AuditID.keyword", + }, + }, + } + b := map[string]interface{}{ + "query": queryPart, + "aggs": aggsPart, + "size": 0, // do not get docs + } + + body, err := json.Marshal(b) + if err != nil { + return nil, err + } + resp, err := es.c.ExSearch(&Request{ + Index: es.opts.index, + Body: bytes.NewBuffer(body), + }) + if err != nil || resp == nil { + return nil, err + } + + raw := resp.Aggregations[aggName] + var agg struct { + Value int64 `json:"value"` + } + if err := json.Unmarshal(raw, &agg); err != nil { + return nil, err + } + + return &auditing.Statistics{ + Resources: agg.Value, + Events: resp.Hits.Total, + }, nil +} + +func NewClient(options *Options) (*Elasticsearch, error) { + clientV5 := func() (*ClientV5, error) { + c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}}) + if err != nil { + return nil, err + } + return (*ClientV5)(c), nil + } + clientV6 := func() (*ClientV6, error) { + c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}}) + if err != nil { + return nil, err + } + return (*ClientV6)(c), nil + } + clientV7 := func() (*ClientV7, error) { + c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}}) + if err != nil { + return nil, err + } + return (*ClientV7)(c), nil + } + + var ( + version = options.Version + es = Elasticsearch{} + err error + ) + es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix) + + if options.Version == "" { + var c5 *ClientV5 + if c5, err = clientV5(); err == nil { + if version, err = c5.Version(); err == nil { + es.c = c5 + } + } + } + if err != nil { + return nil, err + } + + switch strings.Split(version, ".")[0] { + case "5": + if es.c == nil { + es.c, err = clientV5() + } + case "6": + es.c, err = clientV6() + case "7": + es.c, err = clientV7() + default: + err = fmt.Errorf("unsupported elasticsearch version %s", version) + } + if err != nil { + return nil, err + } + return &es, nil +} + +func parseToQueryPart(f *auditing.Filter) interface{} { + if f == nil { + return nil + } + type BoolBody struct { + Filter []map[string]interface{} `json:"filter,omitempty"` + Should []map[string]interface{} `json:"should,omitempty"` + MinimumShouldMatch *int `json:"minimum_should_match,omitempty"` + } + var mini = 1 + b := BoolBody{} + queryBody := map[string]interface{}{ + "bool": &b, + } + + if len(f.ObjectRefNamespaceMap) > 0 { + bi := BoolBody{MinimumShouldMatch: &mini} + for k, v := range f.ObjectRefNamespaceMap { + bi.Should = append(bi.Should, map[string]interface{}{ + "bool": &BoolBody{ + Filter: []map[string]interface{}{{ + "match_phrase": map[string]string{"ObjectRef.Namespace.keyword": k}, + }, { + "range": map[string]interface{}{ + "RequestReceivedTimestamp": map[string]interface{}{ + "gte": v, + }, + }, + }}, + }, + }) + } + if len(bi.Should) > 0 { + b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi}) + } + } + + shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody { + bi := BoolBody{MinimumShouldMatch: &mini} + for _, v := range fieldValues { + if fieldValueMutate != nil { + v = fieldValueMutate(v) + } + bi.Should = append(bi.Should, map[string]interface{}{ + mtype: map[string]string{fieldName: v}, + }) + } + if len(bi.Should) == 0 { + return nil + } + return &bi + } + + if len(f.ObjectRefNames) > 0 { + if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Name.keyword", + f.ObjectRefNames, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + if len(f.ObjectRefNameFuzzy) > 0 { + if bi := shouldBoolbody("wildcard", "ObjectRef.Name", + f.ObjectRefNameFuzzy, func(s string) string { + return fmt.Sprintf("*" + s + "*") + }); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.Verbs) > 0 { + if bi := shouldBoolbody("match_phrase", "Verb", + f.Verbs, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + if len(f.Levels) > 0 { + if bi := shouldBoolbody("match_phrase", "Level", + f.Levels, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.SourceIpFuzzy) > 0 { + if bi := shouldBoolbody("wildcard", "SourceIPs", + f.SourceIpFuzzy, func(s string) string { + return fmt.Sprintf("*" + s + "*") + }); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.Users) > 0 { + if bi := shouldBoolbody("match_phrase", "User.Username.keyword", + f.Users, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + if len(f.UserFuzzy) > 0 { + if bi := shouldBoolbody("wildcard", "User.Username", + f.UserFuzzy, func(s string) string { + return fmt.Sprintf("*" + s + "*") + }); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.GroupFuzzy) > 0 { + if bi := shouldBoolbody("wildcard", "User.Groups", + f.GroupFuzzy, func(s string) string { + return fmt.Sprintf("*" + s + "*") + }); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.ObjectRefResources) > 0 { + if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Resource.keyword", + f.ObjectRefResources, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if len(f.ObjectRefSubresources) > 0 { + if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Subresource.keyword", + f.ObjectRefSubresources, nil); bi != nil { + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + } + + if f.ResponseStatus != nil && len(f.ResponseStatus) > 0 { + + bi := BoolBody{MinimumShouldMatch: &mini} + for _, v := range f.ResponseStatus { + bi.Should = append(bi.Should, map[string]interface{}{ + "term": map[string]int32{"ResponseStatus.code": v}, + }) + } + + b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + } + + if f.StartTime != nil || f.EndTime != nil { + m := make(map[string]*time.Time) + if f.StartTime != nil { + m["gte"] = f.StartTime + } + if f.EndTime != nil { + m["lte"] = f.EndTime + } + b.Filter = append(b.Filter, map[string]interface{}{ + "range": map[string]interface{}{"RequestReceivedTimestamp": m}, + }) + + } + + return queryBody +} diff --git a/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go b/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e647f0828970ba4388d36cb8dfc68f629db7ed11 --- /dev/null +++ b/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package elasticsearch + +import ( + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" +) + +func MockElasticsearchService(pattern string, fakeCode int, fakeResp string) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(fakeCode) + _, _ = res.Write([]byte(fakeResp)) + }) + return httptest.NewServer(mux) +} + +func TestStatisticsOnResources(t *testing.T) { + var tests = []struct { + description string + filter auditing.Filter + fakeVersion string + fakeCode int + fakeResp string + expected auditing.Statistics + expectedError bool + }{{ + description: "ES index exists", + filter: auditing.Filter{}, + fakeVersion: "6", + fakeCode: 200, + fakeResp: ` +{ + "took": 16, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 10000, + "max_score": null, + "hits": [ + + ] + }, + "aggregations": { + "resources_count": { + "value": 100 + } + } +} +`, + expected: auditing.Statistics{ + Events: 10000, + Resources: 100, + }, + expectedError: false, + }, { + description: "ES index not exists", + filter: auditing.Filter{}, + fakeVersion: "6", + fakeCode: 404, + fakeResp: ` +{ + "error": { + "root_cause": [ + { + "type": "index_not_found_exception", + "reason": "no such index [events]", + "resource.type": "index_or_alias", + "resource.id": "events", + "index_uuid": "_na_", + "index": "events" + } + ], + "type": "index_not_found_exception", + "reason": "no such index [events]", + "resource.type": "index_or_alias", + "resource.id": "events", + "index_uuid": "_na_", + "index": "events" + }, + "status": 404 +} +`, + expectedError: true, + }} + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp) + defer mes.Close() + + es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"}) + + if err != nil { + t.Fatal(err) + } + + stats, err := es.StatisticsOnResources(&test.filter) + + if test.expectedError { + if err == nil { + t.Fatalf("expected err like %s", test.fakeResp) + } else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) { + t.Fatalf("err does not contain expected code: %d", test.fakeCode) + } + } else { + if err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, &test.expected); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) + } + } + }) + } +} + +func TestParseToQueryPart(t *testing.T) { + q := ` +{ + "bool": { + "filter": [ + { + "bool": { + "should": [ + { + "bool": { + "filter": [ + { + "match_phrase": { + "ObjectRef.Namespace.keyword": "kubesphere-system" + } + }, + { + "range": { + "RequestReceivedTimestamp": { + "gte": "2020-01-01T01:01:01.000000001Z" + } + } + } + ] + } + } + ], + "minimum_should_match": 1 + } + }, + { + "bool": { + "should": [ + { + "wildcard": { + "ObjectRef.Name": "*istio*" + } + } + ], + "minimum_should_match": 1 + } + }, + { + "range": { + "RequestReceivedTimestamp": { + "gte": "2019-12-01T01:01:01.000000001Z" + } + } + } + ] + } +} +` + nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC) + startTime := nsCreateTime.AddDate(0, -1, 0) + + filter := &auditing.Filter{ + ObjectRefNamespaceMap: map[string]time.Time{ + "kubesphere-system": nsCreateTime, + }, + ObjectRefNameFuzzy: []string{"istio"}, + StartTime: &startTime, + } + + qp := parseToQueryPart(filter) + bs, err := json.Marshal(qp) + if err != nil { + panic(err) + } + + queryPart := &map[string]interface{}{} + if err := json.Unmarshal(bs, queryPart); err != nil { + panic(err) + } + expectedQueryPart := &map[string]interface{}{} + if err := json.Unmarshal([]byte(q), expectedQueryPart); err != nil { + panic(err) + } + + assert.Equal(t, expectedQueryPart, queryPart) +} diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go new file mode 100644 index 0000000000000000000000000000000000000000..8b65aa0f6e087a429b39a49c0983c17beb4b20d6 --- /dev/null +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -0,0 +1,61 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package elasticsearch + +import ( + "github.com/spf13/pflag" + "kubesphere.io/kubesphere/pkg/utils/reflectutils" +) + +type Options struct { + Host string `json:"host" yaml:"host"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` + Version string `json:"version" yaml:"version"` +} + +func NewElasticSearchOptions() *Options { + return &Options{ + Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200", + IndexPrefix: "ks-logstash-auditing", + Version: "", + } +} + +func (s *Options) ApplyTo(options *Options) { + if s.Host != "" { + reflectutils.Override(options, s) + } +} + +func (s *Options) Validate() []error { + errs := make([]error, 0) + return errs +} + +func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { + fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+ + "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ + "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+ + " the following elastic search options will be ignored.") + + fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+ + "Index name prefix. KubeSphere will retrieve auditing against indices matching the prefix.") + + fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+ + "Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+ + "Currently, minimum supported version is 5.x") +} diff --git a/pkg/simple/client/auditing/interface.go b/pkg/simple/client/auditing/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..02bf6d30dfea45c0cc76fb02b4b4ef619fe315a7 --- /dev/null +++ b/pkg/simple/client/auditing/interface.go @@ -0,0 +1,65 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package auditing + +import ( + "time" +) + +type Client interface { + SearchAuditingEvent(filter *Filter, from, size int64, sort string) (*Events, error) + CountOverTime(filter *Filter, interval string) (*Histogram, error) + StatisticsOnResources(filter *Filter) (*Statistics, error) +} + +type Filter struct { + ObjectRefNamespaceMap map[string]time.Time + ObjectRefNames []string + ObjectRefNameFuzzy []string + Levels []string + Verbs []string + Users []string + UserFuzzy []string + GroupFuzzy []string + SourceIpFuzzy []string + ObjectRefResources []string + ObjectRefSubresources []string + ResponseStatus []int32 + StartTime *time.Time + EndTime *time.Time +} + +type Event map[string]interface{} + +type Events struct { + Total int64 `json:"total" description:"total number of matched results"` + Records []*Event `json:"records" description:"actual array of results"` +} + +type Histogram struct { + Total int64 `json:"total" description:"total number of events"` + Buckets []Bucket `json:"buckets" description:"actual array of histogram results"` +} +type Bucket struct { + Time int64 `json:"time" description:"timestamp"` + Count int64 `json:"count" description:"total number of events at intervals"` +} + +type Statistics struct { + Resources int64 `json:"resources" description:"total number of resources"` + Events int64 `json:"events" description:"total number of events"` +} diff --git a/tools/cmd/doc-gen/main.go b/tools/cmd/doc-gen/main.go index b4d7cdb6ce4904faca504b8e01e74eda6813dc83..dc3fabcb02a27fcce442d02b02708e21aa77593b 100644 --- a/tools/cmd/doc-gen/main.go +++ b/tools/cmd/doc-gen/main.go @@ -117,7 +117,7 @@ func generateSwaggerJson() []byte { urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes())) urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory)) urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory)) - urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil)) + urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil)) urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil)) urlruntime.Must(metricsv1alpha2.AddToContainer(container)) urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))