提交 951c3118 编写于 作者: R root

add auditing events search api

上级 ea2e8769
......@@ -11,6 +11,7 @@ import (
apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/informers"
genericoptions "kubesphere.io/kubesphere/pkg/server/options"
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
......@@ -56,6 +57,7 @@ func NewServerRunOptions() *ServerRunOptions {
AuthenticationOptions: authoptions.NewAuthenticateOptions(),
MultiClusterOptions: multicluster.NewOptions(),
EventsOptions: eventsclient.NewElasticSearchOptions(),
AuditingOptions: auditingclient.NewElasticSearchOptions(),
},
}
......@@ -81,6 +83,7 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
s.LoggingOptions.AddFlags(fss.FlagSet("logging"), s.LoggingOptions)
s.MultiClusterOptions.AddFlags(fss.FlagSet("multicluster"), s.MultiClusterOptions)
s.EventsOptions.AddFlags(fss.FlagSet("events"), s.EventsOptions)
s.AuditingOptions.AddFlags(fss.FlagSet("auditing"), s.AuditingOptions)
fs = fss.FlagSet("klog")
local := flag.NewFlagSet("klog", flag.ExitOnError)
......@@ -188,6 +191,14 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
apiServer.EventsClient = eventsClient
}
if s.AuditingOptions.Host != "" {
auditingClient, err := auditingclient.NewClient(s.AuditingOptions)
if err != nil {
return nil, err
}
apiServer.AuditingClient = auditingClient
}
if s.OpenPitrixOptions != nil {
opClient, err := openpitrix.NewClient(s.OpenPitrixOptions)
if err != nil {
......
......@@ -17,6 +17,7 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, s.LoggingOptions.Validate()...)
errors = append(errors, s.AuthorizationOptions.Validate()...)
errors = append(errors, s.EventsOptions.Validate()...)
errors = append(errors, s.AuditingOptions.Validate()...)
return errors
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"github.com/emicklei/go-restful"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"strconv"
"time"
)
type APIResponse struct {
Events *auditing.Events `json:"query,omitempty" description:"query results"`
Statistics *auditing.Statistics `json:"statistics,omitempty" description:"statistics results"`
Histogram *auditing.Histogram `json:"histogram,omitempty" description:"histogram results"`
}
type Query struct {
Operation string `json:"operation,omitempty"`
WorkspaceFilter string `json:"workspace_filter,omitempty"`
WorkspaceSearch string `json:"workspace_search,omitempty"`
ObjectRefNamespaceFilter string `json:"objectref_namespace_filter,omitempty"`
ObjectRefNamespaceSearch string `json:"objectref_namespace_search,omitempty"`
ObjectRefNameFilter string `json:"objectref_name_filter,omitempty"`
ObjectRefNameSearch string `json:"objectref_name_search,omitempty"`
LevelFilter string `json:"level_filter,omitempty"`
VerbFilter string `json:"verb_filter,omitempty"`
UserFilter string `json:"user_filter,omitempty"`
UserSearch string `json:"user_search,omitempty"`
GroupSearch string `json:"group_search,omitempty"`
SourceIpSearch string `json:"source_ip_search,omitempty"`
ObjectRefResourceFilter string `json:"objectref_resource_filter,omitempty"`
ObjectRefSubresourceFilter string `json:"objectref_subresource_filter,omitempty"`
ResponesStatusFilter string `json:"response_status_filter,omitempty"`
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
Interval string `json:"interval,omitempty"`
Sort string `json:"sort,omitempty"`
From int64 `json:"from,omitempty"`
Size int64 `json:"size,omitempty"`
}
func ParseQueryParameter(req *restful.Request) (*Query, error) {
q := &Query{}
q.Operation = req.QueryParameter("operation")
q.WorkspaceFilter = req.QueryParameter("workspace_filter")
q.WorkspaceSearch = req.QueryParameter("workspace_search")
q.ObjectRefNamespaceFilter = req.QueryParameter("objectref_namespace_filter")
q.ObjectRefNamespaceSearch = req.QueryParameter("objectref_namespace_search")
q.ObjectRefNameFilter = req.QueryParameter("objectref_name_filter")
q.ObjectRefNameSearch = req.QueryParameter("objectref_name_search")
q.LevelFilter = req.QueryParameter("level_filter")
q.VerbFilter = req.QueryParameter("verb_filter")
q.SourceIpSearch = req.QueryParameter("source_ip_search")
q.UserFilter = req.QueryParameter("user_filter")
q.UserSearch = req.QueryParameter("user_search")
q.GroupSearch = req.QueryParameter("group_search")
q.ObjectRefResourceFilter = req.QueryParameter("objectref_resource_filter")
q.ObjectRefSubresourceFilter = req.QueryParameter("objectref_subresource_filter")
q.ResponesStatusFilter = req.QueryParameter("response_status_filter")
if tstr := req.QueryParameter("start_time"); tstr != "" {
sec, err := strconv.ParseInt(tstr, 10, 64)
if err != nil {
return nil, err
}
t := time.Unix(sec, 0)
q.StartTime = &t
}
if tstr := req.QueryParameter("end_time"); tstr != "" {
sec, err := strconv.ParseInt(tstr, 10, 64)
if err != nil {
return nil, err
}
t := time.Unix(sec, 0)
q.EndTime = &t
}
if q.Interval = req.QueryParameter("interval"); q.Interval == "" {
q.Interval = "15m"
}
q.From, _ = strconv.ParseInt(req.QueryParameter("from"), 10, 64)
size, err := strconv.ParseInt(req.QueryParameter("size"), 10, 64)
if err != nil {
size = 10
}
q.Size = size
if q.Sort = req.QueryParameter("sort"); q.Sort != "asc" {
q.Sort = "desc"
}
return q, nil
}
......@@ -63,6 +63,7 @@ import (
"kubesphere.io/kubesphere/pkg/kapis/version"
"kubesphere.io/kubesphere/pkg/models/iam/am"
"kubesphere.io/kubesphere/pkg/models/iam/im"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/events"
......@@ -135,6 +136,8 @@ type APIServer struct {
SonarClient sonarqube.SonarInterface
EventsClient events.Client
AuditingClient auditing.Client
}
func (s *APIServer) PrepareRun() error {
......@@ -172,7 +175,7 @@ func (s *APIServer) installKubeSphereAPIs() {
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
s.KubernetesClient.Master()))
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient))
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient))
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config()))
urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
s.InformerFactory.KubernetesSharedInformerFactory(),
......
......@@ -22,6 +22,7 @@ import (
authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options"
authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
......@@ -92,6 +93,7 @@ type Config struct {
AuthorizationOptions *authorizationoptions.AuthorizationOptions `json:"authorization,omitempty" yaml:"authorization,omitempty" mapstructure:"authorization"`
MultiClusterOptions *multicluster.Options `json:"multicluster,omitempty" yaml:"multicluster,omitempty" mapstructure:"multicluster"`
EventsOptions *eventsclient.Options `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"`
AuditingOptions *auditingclient.Options `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"`
// Options used for enabling components, not actually used now. Once we switch Alerting/Notification API to kubesphere,
// we can add these options to kubesphere command lines
AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"`
......@@ -118,6 +120,7 @@ func New() *Config {
AuthorizationOptions: authorizationoptions.NewAuthorizationOptions(),
MultiClusterOptions: multicluster.NewOptions(),
EventsOptions: eventsclient.NewElasticSearchOptions(),
AuditingOptions: auditingclient.NewElasticSearchOptions(),
}
}
......@@ -236,4 +239,8 @@ func (conf *Config) stripEmptyOptions() {
if conf.EventsOptions != nil && conf.EventsOptions.Host == "" {
conf.EventsOptions = nil
}
if conf.AuditingOptions != nil && conf.AuditingOptions.Host == "" {
conf.AuditingOptions = nil
}
}
......@@ -25,6 +25,7 @@ import (
authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options"
authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
......@@ -146,6 +147,11 @@ func newTestConfig() (*Config, error) {
IndexPrefix: "ks-logstash-events",
Version: "6",
},
AuditingOptions: &auditingclient.Options{
Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200",
IndexPrefix: "ks-logstash-auditing",
Version: "6",
},
}
return conf, nil
}
......
......@@ -83,6 +83,7 @@ const (
LogQueryTag = "Log Query"
TerminalTag = "Terminal"
EventsQueryTag = "Events Query"
AuditingQueryTag = "Auditing Query"
)
var (
......
......@@ -8,6 +8,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2"
......@@ -17,6 +18,7 @@ import (
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models/tenant"
servererr "kubesphere.io/kubesphere/pkg/server/errors"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/simple/client/events"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
)
......@@ -25,10 +27,10 @@ type tenantHandler struct {
tenant tenant.Interface
}
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) *tenantHandler {
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) *tenantHandler {
return &tenantHandler{
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient),
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient),
}
}
......@@ -282,3 +284,29 @@ func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response)
resp.WriteAsJson(result)
}
}
func (h *tenantHandler) Auditing(req *restful.Request, resp *restful.Response) {
user, ok := request.UserFrom(req.Request.Context())
if !ok {
err := fmt.Errorf("cannot obtain user info")
klog.Errorln(err)
api.HandleForbidden(resp, req, err)
return
}
queryParam, err := auditingv1alpha1.ParseQueryParameter(req)
if err != nil {
klog.Errorln(err)
api.HandleInternalError(resp, req, err)
return
}
result, err := h.tenant.Auditing(user, queryParam)
if err != nil {
klog.Errorln(err)
api.HandleInternalError(resp, req, err)
return
}
_ = resp.WriteEntity(result)
}
......@@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"kubesphere.io/kubesphere/pkg/api"
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2"
......@@ -32,6 +33,7 @@ import (
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/server/errors"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/simple/client/events"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"net/http"
......@@ -43,9 +45,9 @@ const (
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"}
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) error {
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) error {
ws := runtime.NewWebService(GroupVersion)
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient)
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient)
ws.Route(ws.POST("/workspaces").
To(handler.CreateWorkspace).
......@@ -146,6 +148,35 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON, "text/plain")
ws.Route(ws.GET("/auditing/events").
To(handler.Auditing).
Doc("Query auditing events against the cluster").
Param(ws.QueryParameter("operation", "Operation type. This can be one of three types: `query` (for querying events), `statistics` (for retrieving statistical data), `histogram` (for displaying events count by time interval). Defaults to query.").DefaultValue("query")).
Param(ws.QueryParameter("workspace_filter", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`.")).
Param(ws.QueryParameter("workspace_search", "A comma-separated list of keywords. Differing from **workspace_filter**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
Param(ws.QueryParameter("objectref_namespace_filter", "A comma-separated list of namespaces. This field restricts the query to specified `ObjectRef.Namespace`.")).
Param(ws.QueryParameter("objectref_namespace_search", "A comma-separated list of keywords. Differing from **objectref_namespace_filter**, this field performs fuzzy matching on `ObjectRef.Namespace`.")).
Param(ws.QueryParameter("objectref_name_filter", "A comma-separated list of names. This field restricts the query to specified `ObjectRef.Name`.")).
Param(ws.QueryParameter("objectref_name_search", "A comma-separated list of keywords. Differing from **objectref_name_filter**, this field performs fuzzy matching on `ObjectRef.Name`.")).
Param(ws.QueryParameter("level_filter", "A comma-separated list of levels. This know values are Metadata, Request, RequestResponse.")).
Param(ws.QueryParameter("verb_filter", "A comma-separated list of verbs. This field restricts the query to specified verb. This field restricts the query to specified `Verb`.")).
Param(ws.QueryParameter("user_filter", "A comma-separated list of user. This field restricts the query to specified user. For example, the following filter matches the user user1 and user2: `user1,user2`.")).
Param(ws.QueryParameter("user_search", "A comma-separated list of keywords. Differing from **user_filter**, this field performs fuzzy matching on 'User.username'. For example, the following value limits the query to user whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
Param(ws.QueryParameter("group_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'User.Groups'. For example, the following value limits the query to group which contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
Param(ws.QueryParameter("source_ip_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'SourceIPs'. For example, the following value limits the query to SourceIPs which contains 127.0 *OR* 192.168.: `127.0,192.168.`.")).
Param(ws.QueryParameter("objectref_resource_filter", "A comma-separated list of resource. This field restricts the query to specified ip. This field restricts the query to specified `ObjectRef.Resource`.")).
Param(ws.QueryParameter("objectref_subresource_filter", "A comma-separated list of subresource. This field restricts the query to specified subresource. This field restricts the query to specified `ObjectRef.Subresource`.")).
Param(ws.QueryParameter("response_status_filter", "A comma-separated list of response status code. This field restricts the query to specified response status code. This field restricts the query to specified `ResponseStatus.Code`.")).
Param(ws.QueryParameter("start_time", "Start time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")).
Param(ws.QueryParameter("end_time", "End time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")).
Param(ws.QueryParameter("interval", "Time interval. It requires **operation** is set to `histogram`. The format is [0-9]+[smhdwMqy]. Defaults to 15m (i.e. 15 min).").DefaultValue("15m")).
Param(ws.QueryParameter("sort", "Sort order. One of asc, desc. This field sorts events by `RequestReceivedTimestamp`.").DataType("string").DefaultValue("desc")).
Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to `query`. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)).
Param(ws.QueryParameter("size", "Size of result set to return. It requires **operation** is set to `query`. Defaults to 10 (i.e. 10 event records).").DataType("integer").DefaultValue("10").Required(false)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AuditingQueryTag}).
Writes(auditingv1alpha1.APIResponse{}).
Returns(http.StatusOK, api.StatusOK, auditingv1alpha1.APIResponse{}))
c.Add(ws)
return nil
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package auditing
import (
"kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
"strconv"
)
type Interface interface {
Events(queryParam *v1alpha1.Query, MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error)
}
type eventsOperator struct {
client auditing.Client
}
func NewEventsOperator(client auditing.Client) Interface {
return &eventsOperator{client}
}
func (eo *eventsOperator) Events(queryParam *v1alpha1.Query,
MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error) {
filter := &auditing.Filter{
ObjectRefNames: stringutils.Split(queryParam.ObjectRefNameFilter, ","),
ObjectRefNameFuzzy: stringutils.Split(queryParam.ObjectRefNameSearch, ","),
Levels: stringutils.Split(queryParam.LevelFilter, ","),
Verbs: stringutils.Split(queryParam.VerbFilter, ","),
Users: stringutils.Split(queryParam.UserFilter, ","),
UserFuzzy: stringutils.Split(queryParam.UserSearch, ","),
GroupFuzzy: stringutils.Split(queryParam.GroupSearch, ","),
SourceIpFuzzy: stringutils.Split(queryParam.SourceIpSearch, ","),
ObjectRefResources: stringutils.Split(queryParam.ObjectRefResourceFilter, ","),
ObjectRefSubresources: stringutils.Split(queryParam.ObjectRefSubresourceFilter, ","),
StartTime: queryParam.StartTime,
EndTime: queryParam.EndTime,
}
if MutateFilterFunc != nil {
MutateFilterFunc(filter)
}
cs := stringutils.Split(queryParam.ResponesStatusFilter, ",")
for _, c := range cs {
code, err := strconv.ParseInt(c, 10, 64)
if err != nil {
continue
}
filter.ResponseStatus = append(filter.ResponseStatus, int32(code))
}
var ar v1alpha1.APIResponse
var err error
switch queryParam.Operation {
case "histogram":
if len(filter.ObjectRefNamespaceMap) == 0 {
ar.Histogram = &auditing.Histogram{}
} else {
ar.Histogram, err = eo.client.CountOverTime(filter, queryParam.Interval)
}
case "statistics":
if len(filter.ObjectRefNamespaceMap) == 0 {
ar.Statistics = &auditing.Statistics{}
} else {
ar.Statistics, err = eo.client.StatisticsOnResources(filter)
}
default:
if len(filter.ObjectRefNamespaceMap) == 0 {
ar.Events = &auditing.Events{}
} else {
ar.Events, err = eo.client.SearchAuditingEvent(filter, queryParam.From, queryParam.Size, queryParam.Sort)
}
}
if err != nil {
return nil, err
}
return &ar, nil
}
......@@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
......@@ -37,11 +38,13 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/query"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models/auditing"
"kubesphere.io/kubesphere/pkg/models/events"
"kubesphere.io/kubesphere/pkg/models/iam/am"
"kubesphere.io/kubesphere/pkg/models/logging"
resources "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
resourcesv1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource"
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing"
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events"
loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
......@@ -62,6 +65,7 @@ type Interface interface {
Events(user user.Info, queryParam *eventsv1alpha1.Query) (*eventsv1alpha1.APIResponse, error)
QueryLogs(user user.Info, query *loggingv1alpha2.Query) (*loggingv1alpha2.APIResponse, error)
ExportLogs(user user.Info, query *loggingv1alpha2.Query, writer io.Writer) error
Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error)
}
type tenantOperator struct {
......@@ -72,9 +76,10 @@ type tenantOperator struct {
resourceGetter *resourcesv1alpha3.ResourceGetter
events events.Interface
lo logging.LoggingOperator
auditing auditing.Interface
}
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface) Interface {
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface, auditingclient auditingclient.Client) Interface {
amOperator := am.NewReadOnlyOperator(informers)
authorizer := authorizerfactory.NewRBACAuthorizer(amOperator)
return &tenantOperator{
......@@ -85,6 +90,7 @@ func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ks
ksclient: ksclient,
events: events.NewEventsOperator(evtsClient),
lo: logging.NewLoggingOperator(loggingClient),
auditing: auditing.NewEventsOperator(auditingclient),
}
}
......@@ -523,6 +529,48 @@ func (t *tenantOperator) ExportLogs(user user.Info, query *loggingv1alpha2.Query
}
}
func (t *tenantOperator) Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error) {
iNamespaces, err := t.listIntersectedNamespaces(user,
stringutils.Split(queryParam.WorkspaceFilter, ","),
stringutils.Split(queryParam.WorkspaceSearch, ","),
stringutils.Split(queryParam.ObjectRefNamespaceFilter, ","),
stringutils.Split(queryParam.ObjectRefNamespaceSearch, ","))
if err != nil {
klog.Error(err)
return nil, err
}
namespaceCreateTimeMap := make(map[string]time.Time)
for _, ns := range iNamespaces {
namespaceCreateTimeMap[ns.Name] = ns.CreationTimestamp.Time
}
// If there are no ns and ws query conditions,
// those events with empty `ObjectRef.Namespace` will also be listed when user can list all namespaces
if len(queryParam.WorkspaceFilter) == 0 && len(queryParam.ObjectRefNamespaceFilter) == 0 &&
len(queryParam.WorkspaceSearch) == 0 && len(queryParam.ObjectRefNamespaceSearch) == 0 {
listEvts := authorizer.AttributesRecord{
User: user,
Verb: "list",
APIGroup: "",
APIVersion: "v1",
Resource: "namespaces",
ResourceRequest: true,
}
decision, _, err := t.authorizer.Authorize(listEvts)
if err != nil {
klog.Error(err)
return nil, err
}
if decision == authorizer.DecisionAllow {
namespaceCreateTimeMap[""] = time.Time{}
}
}
return t.auditing.Events(queryParam, func(filter *auditingclient.Filter) {
filter.ObjectRefNamespaceMap = namespaceCreateTimeMap
})
}
func contains(objects []runtime.Object, object runtime.Object) bool {
for _, item := range objects {
if item == object {
......
......@@ -328,5 +328,5 @@ func prepare() Interface {
RoleBindings().Informer().GetIndexer().Add(roleBinding)
}
return New(fakeInformerFactory, nil, nil, nil, nil)
return New(fakeInformerFactory, nil, nil, nil, nil, nil)
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package elasticsearch
import (
"fmt"
es5 "github.com/elastic/go-elasticsearch/v5"
es5api "github.com/elastic/go-elasticsearch/v5/esapi"
es6 "github.com/elastic/go-elasticsearch/v6"
es6api "github.com/elastic/go-elasticsearch/v6/esapi"
es7 "github.com/elastic/go-elasticsearch/v7"
es7api "github.com/elastic/go-elasticsearch/v7/esapi"
jsoniter "github.com/json-iterator/go"
"io"
"net/http"
)
type Request struct {
Index string
Body io.Reader
}
type Response struct {
Hits Hits `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
type Hits struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
}
type Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Status int `json:"status"`
}
func (e Error) Error() string {
return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason)
}
type ClientV5 es5.Client
func (c *ClientV5) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer func() {
_ = resp.Body.Close()
}()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
func (c *ClientV5) Version() (string, error) {
resp, err := c.Info()
if err != nil {
return "", err
}
defer func() {
_ = resp.Body.Close()
}()
if resp.IsError() {
return "", fmt.Errorf(resp.String())
}
var r map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return "", fmt.Errorf("error parsing the response body: %s", err)
}
return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil
}
type ClientV6 es6.Client
func (c *ClientV6) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer func() {
_ = resp.Body.Close()
}()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits *struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
type ClientV7 es7.Client
func (c *ClientV7) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer func() {
_ = resp.Body.Close()
}()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits *struct {
Total struct {
Value int64 `json:"value"`
} `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
type client interface {
ExSearch(r *Request) (*Response, error)
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package elasticsearch
import (
"bytes"
"fmt"
"strings"
"time"
es5 "github.com/elastic/go-elasticsearch/v5"
es6 "github.com/elastic/go-elasticsearch/v6"
es7 "github.com/elastic/go-elasticsearch/v7"
jsoniter "github.com/json-iterator/go"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Elasticsearch struct {
c client
opts struct {
index string
}
}
func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size int64,
sort string) (*auditing.Events, error) {
queryPart := parseToQueryPart(filter)
if sort == "" {
sort = "desc"
}
sortPart := []map[string]interface{}{{
"RequestReceivedTimestamp": map[string]string{"order": sort},
}}
b := map[string]interface{}{
"from": from,
"size": size,
"query": queryPart,
"sort": sortPart,
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
var innerHits []struct {
*auditing.Event `json:"_source"`
}
if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil {
return nil, err
}
evts := auditing.Events{Total: resp.Hits.Total}
for _, hit := range innerHits {
evts.Records = append(evts.Records, hit.Event)
}
return &evts, nil
}
func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) {
if interval == "" {
interval = "15m"
}
queryPart := parseToQueryPart(filter)
aggName := "events_count_over_timestamp"
aggsPart := map[string]interface{}{
aggName: map[string]interface{}{
"date_histogram": map[string]string{
"field": "RequestReceivedTimestamp",
"interval": interval,
},
},
}
b := map[string]interface{}{
"query": queryPart,
"aggs": aggsPart,
"size": 0, // do not get docs
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
raw := resp.Aggregations[aggName]
var agg struct {
Buckets []struct {
KeyAsString string `json:"key_as_string"`
Key int64 `json:"key"`
DocCount int64 `json:"doc_count"`
} `json:"buckets"`
}
if err := json.Unmarshal(raw, &agg); err != nil {
return nil, err
}
histo := auditing.Histogram{Total: int64(len(agg.Buckets))}
for _, b := range agg.Buckets {
histo.Buckets = append(histo.Buckets,
auditing.Bucket{Time: b.Key, Count: b.DocCount})
}
return &histo, nil
}
func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) {
queryPart := parseToQueryPart(filter)
aggName := "resources_count"
aggsPart := map[string]interface{}{
aggName: map[string]interface{}{
"cardinality": map[string]string{
"field": "AuditID.keyword",
},
},
}
b := map[string]interface{}{
"query": queryPart,
"aggs": aggsPart,
"size": 0, // do not get docs
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
raw := resp.Aggregations[aggName]
var agg struct {
Value int64 `json:"value"`
}
if err := json.Unmarshal(raw, &agg); err != nil {
return nil, err
}
return &auditing.Statistics{
Resources: agg.Value,
Events: resp.Hits.Total,
}, nil
}
func NewClient(options *Options) (*Elasticsearch, error) {
clientV5 := func() (*ClientV5, error) {
c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV5)(c), nil
}
clientV6 := func() (*ClientV6, error) {
c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV6)(c), nil
}
clientV7 := func() (*ClientV7, error) {
c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV7)(c), nil
}
var (
version = options.Version
es = Elasticsearch{}
err error
)
es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix)
if options.Version == "" {
var c5 *ClientV5
if c5, err = clientV5(); err == nil {
if version, err = c5.Version(); err == nil {
es.c = c5
}
}
}
if err != nil {
return nil, err
}
switch strings.Split(version, ".")[0] {
case "5":
if es.c == nil {
es.c, err = clientV5()
}
case "6":
es.c, err = clientV6()
case "7":
es.c, err = clientV7()
default:
err = fmt.Errorf("unsupported elasticsearch version %s", version)
}
if err != nil {
return nil, err
}
return &es, nil
}
func parseToQueryPart(f *auditing.Filter) interface{} {
if f == nil {
return nil
}
type BoolBody struct {
Filter []map[string]interface{} `json:"filter,omitempty"`
Should []map[string]interface{} `json:"should,omitempty"`
MinimumShouldMatch *int `json:"minimum_should_match,omitempty"`
}
var mini = 1
b := BoolBody{}
queryBody := map[string]interface{}{
"bool": &b,
}
if len(f.ObjectRefNamespaceMap) > 0 {
bi := BoolBody{MinimumShouldMatch: &mini}
for k, v := range f.ObjectRefNamespaceMap {
bi.Should = append(bi.Should, map[string]interface{}{
"bool": &BoolBody{
Filter: []map[string]interface{}{{
"match_phrase": map[string]string{"ObjectRef.Namespace.keyword": k},
}, {
"range": map[string]interface{}{
"RequestReceivedTimestamp": map[string]interface{}{
"gte": v,
},
},
}},
},
})
}
if len(bi.Should) > 0 {
b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi})
}
}
shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody {
bi := BoolBody{MinimumShouldMatch: &mini}
for _, v := range fieldValues {
if fieldValueMutate != nil {
v = fieldValueMutate(v)
}
bi.Should = append(bi.Should, map[string]interface{}{
mtype: map[string]string{fieldName: v},
})
}
if len(bi.Should) == 0 {
return nil
}
return &bi
}
if len(f.ObjectRefNames) > 0 {
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Name.keyword",
f.ObjectRefNames, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.ObjectRefNameFuzzy) > 0 {
if bi := shouldBoolbody("wildcard", "ObjectRef.Name",
f.ObjectRefNameFuzzy, func(s string) string {
return fmt.Sprintf("*" + s + "*")
}); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.Verbs) > 0 {
if bi := shouldBoolbody("match_phrase", "Verb",
f.Verbs, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.Levels) > 0 {
if bi := shouldBoolbody("match_phrase", "Level",
f.Levels, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.SourceIpFuzzy) > 0 {
if bi := shouldBoolbody("wildcard", "SourceIPs",
f.SourceIpFuzzy, func(s string) string {
return fmt.Sprintf("*" + s + "*")
}); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.Users) > 0 {
if bi := shouldBoolbody("match_phrase", "User.Username.keyword",
f.Users, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.UserFuzzy) > 0 {
if bi := shouldBoolbody("wildcard", "User.Username",
f.UserFuzzy, func(s string) string {
return fmt.Sprintf("*" + s + "*")
}); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.GroupFuzzy) > 0 {
if bi := shouldBoolbody("wildcard", "User.Groups",
f.GroupFuzzy, func(s string) string {
return fmt.Sprintf("*" + s + "*")
}); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.ObjectRefResources) > 0 {
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Resource.keyword",
f.ObjectRefResources, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.ObjectRefSubresources) > 0 {
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Subresource.keyword",
f.ObjectRefSubresources, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if f.ResponseStatus != nil && len(f.ResponseStatus) > 0 {
bi := BoolBody{MinimumShouldMatch: &mini}
for _, v := range f.ResponseStatus {
bi.Should = append(bi.Should, map[string]interface{}{
"term": map[string]int32{"ResponseStatus.code": v},
})
}
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
if f.StartTime != nil || f.EndTime != nil {
m := make(map[string]*time.Time)
if f.StartTime != nil {
m["gte"] = f.StartTime
}
if f.EndTime != nil {
m["lte"] = f.EndTime
}
b.Filter = append(b.Filter, map[string]interface{}{
"range": map[string]interface{}{"RequestReceivedTimestamp": m},
})
}
return queryBody
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package elasticsearch
import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
)
func MockElasticsearchService(pattern string, fakeCode int, fakeResp string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(fakeCode)
_, _ = res.Write([]byte(fakeResp))
})
return httptest.NewServer(mux)
}
func TestStatisticsOnResources(t *testing.T) {
var tests = []struct {
description string
filter auditing.Filter
fakeVersion string
fakeCode int
fakeResp string
expected auditing.Statistics
expectedError bool
}{{
description: "ES index exists",
filter: auditing.Filter{},
fakeVersion: "6",
fakeCode: 200,
fakeResp: `
{
"took": 16,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10000,
"max_score": null,
"hits": [
]
},
"aggregations": {
"resources_count": {
"value": 100
}
}
}
`,
expected: auditing.Statistics{
Events: 10000,
Resources: 100,
},
expectedError: false,
}, {
description: "ES index not exists",
filter: auditing.Filter{},
fakeVersion: "6",
fakeCode: 404,
fakeResp: `
{
"error": {
"root_cause": [
{
"type": "index_not_found_exception",
"reason": "no such index [events]",
"resource.type": "index_or_alias",
"resource.id": "events",
"index_uuid": "_na_",
"index": "events"
}
],
"type": "index_not_found_exception",
"reason": "no such index [events]",
"resource.type": "index_or_alias",
"resource.id": "events",
"index_uuid": "_na_",
"index": "events"
},
"status": 404
}
`,
expectedError: true,
}}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp)
defer mes.Close()
es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"})
if err != nil {
t.Fatal(err)
}
stats, err := es.StatisticsOnResources(&test.filter)
if test.expectedError {
if err == nil {
t.Fatalf("expected err like %s", test.fakeResp)
} else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) {
t.Fatalf("err does not contain expected code: %d", test.fakeCode)
}
} else {
if err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(stats, &test.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
}
}
})
}
}
func TestParseToQueryPart(t *testing.T) {
q := `
{
"bool": {
"filter": [
{
"bool": {
"should": [
{
"bool": {
"filter": [
{
"match_phrase": {
"ObjectRef.Namespace.keyword": "kubesphere-system"
}
},
{
"range": {
"RequestReceivedTimestamp": {
"gte": "2020-01-01T01:01:01.000000001Z"
}
}
}
]
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"should": [
{
"wildcard": {
"ObjectRef.Name": "*istio*"
}
}
],
"minimum_should_match": 1
}
},
{
"range": {
"RequestReceivedTimestamp": {
"gte": "2019-12-01T01:01:01.000000001Z"
}
}
}
]
}
}
`
nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC)
startTime := nsCreateTime.AddDate(0, -1, 0)
filter := &auditing.Filter{
ObjectRefNamespaceMap: map[string]time.Time{
"kubesphere-system": nsCreateTime,
},
ObjectRefNameFuzzy: []string{"istio"},
StartTime: &startTime,
}
qp := parseToQueryPart(filter)
bs, err := json.Marshal(qp)
if err != nil {
panic(err)
}
queryPart := &map[string]interface{}{}
if err := json.Unmarshal(bs, queryPart); err != nil {
panic(err)
}
expectedQueryPart := &map[string]interface{}{}
if err := json.Unmarshal([]byte(q), expectedQueryPart); err != nil {
panic(err)
}
assert.Equal(t, expectedQueryPart, queryPart)
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package elasticsearch
import (
"github.com/spf13/pflag"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
)
type Options struct {
Host string `json:"host" yaml:"host"`
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
Version string `json:"version" yaml:"version"`
}
func NewElasticSearchOptions() *Options {
return &Options{
Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200",
IndexPrefix: "ks-logstash-auditing",
Version: "",
}
}
func (s *Options) ApplyTo(options *Options) {
if s.Host != "" {
reflectutils.Override(options, s)
}
}
func (s *Options) Validate() []error {
errs := make([]error, 0)
return errs
}
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+
" the following elastic search options will be ignored.")
fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+
"Index name prefix. KubeSphere will retrieve auditing against indices matching the prefix.")
fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+
"Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+
"Currently, minimum supported version is 5.x")
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package auditing
import (
"time"
)
type Client interface {
SearchAuditingEvent(filter *Filter, from, size int64, sort string) (*Events, error)
CountOverTime(filter *Filter, interval string) (*Histogram, error)
StatisticsOnResources(filter *Filter) (*Statistics, error)
}
type Filter struct {
ObjectRefNamespaceMap map[string]time.Time
ObjectRefNames []string
ObjectRefNameFuzzy []string
Levels []string
Verbs []string
Users []string
UserFuzzy []string
GroupFuzzy []string
SourceIpFuzzy []string
ObjectRefResources []string
ObjectRefSubresources []string
ResponseStatus []int32
StartTime *time.Time
EndTime *time.Time
}
type Event map[string]interface{}
type Events struct {
Total int64 `json:"total" description:"total number of matched results"`
Records []*Event `json:"records" description:"actual array of results"`
}
type Histogram struct {
Total int64 `json:"total" description:"total number of events"`
Buckets []Bucket `json:"buckets" description:"actual array of histogram results"`
}
type Bucket struct {
Time int64 `json:"time" description:"timestamp"`
Count int64 `json:"count" description:"total number of events at intervals"`
}
type Statistics struct {
Resources int64 `json:"resources" description:"total number of resources"`
Events int64 `json:"events" description:"total number of events"`
}
......@@ -117,7 +117,7 @@ func generateSwaggerJson() []byte {
urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))
urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory))
urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory))
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil))
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil))
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil))
urlruntime.Must(metricsv1alpha2.AddToContainer(container))
urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册