diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index ab306fd878c193ed7e54d630ffd6ac1703479d02..392c9d0142b1785a8f38baffab4c92514fb5d60d 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -281,7 +281,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { if s.Config.AuditingOptions.Enable { handler = filters.WithAuditing(handler, - audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions.WebhookUrl, stopCh)) + audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh)) } var authorizers authorizer.Authorizer diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index 96041c8542f20e3ab470036782f432b5189273d5..078010156a2fb2b4ea1bb08be0616c84b8048bb2 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -23,42 +23,62 @@ import ( "encoding/json" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" + options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "net/http" "time" ) const ( - WaitTimeout = time.Second - WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" + GetSenderTimeout = time.Second + SendTimeout = time.Second * 3 + DefaultSendersNum = 100 + DefaultBatchSize = 100 + DefaultBatchInterval = time.Second * 3 + WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" ) type Backend struct { - url string - channelCapacity int - semCh chan interface{} - cache chan *v1alpha1.EventList - client http.Client - sendTimeout time.Duration - waitTimeout time.Duration - stopCh <-chan struct{} + url string + senderCh chan interface{} + cache chan *v1alpha1.Event + client http.Client + sendTimeout time.Duration + getSenderTimeout time.Duration + eventBatchSize int + eventBatchInterval time.Duration + stopCh <-chan struct{} } -func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { +func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend { b := Backend{ - url: url, - semCh: make(chan interface{}, channelCapacity), - channelCapacity: channelCapacity, - waitTimeout: WaitTimeout, - cache: cache, - sendTimeout: sendTimeout, - stopCh: stopCh, + url: opts.WebhookUrl, + getSenderTimeout: GetSenderTimeout, + cache: cache, + sendTimeout: SendTimeout, + eventBatchSize: opts.EventBatchSize, + eventBatchInterval: opts.EventBatchInterval, + stopCh: stopCh, } if len(b.url) == 0 { b.url = WebhookURL } + if b.eventBatchInterval == 0 { + b.eventBatchInterval = DefaultBatchInterval + } + + if b.eventBatchSize == 0 { + b.eventBatchSize = DefaultBatchSize + } + + sendersNum := opts.EventSendersNum + if sendersNum == 0 { + sendersNum = DefaultSendersNum + } + b.senderCh = make(chan interface{}, sendersNum) + b.client = http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ @@ -76,53 +96,97 @@ func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, func (b *Backend) worker() { for { + events := b.getEvents() + if events == nil { + break + } - var event *v1alpha1.EventList + if len(events.Items) == 0 { + continue + } + + go b.sendEvents(events) + } +} + +func (b *Backend) getEvents() *v1alpha1.EventList { + + ctx, cancel := context.WithTimeout(context.Background(), b.eventBatchInterval) + defer cancel() + + events := &v1alpha1.EventList{} + for { select { - case event = <-b.cache: + case event := <-b.cache: if event == nil { break } + events.Items = append(events.Items, *event) + if len(events.Items) >= b.eventBatchSize { + return events + } + case <-ctx.Done(): + return events case <-b.stopCh: - break + return nil } + } +} - send := func(event *v1alpha1.EventList) { - ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) - defer cancel() +func (b *Backend) sendEvents(events *v1alpha1.EventList) { - select { - case <-ctx.Done(): - klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID) - return - case b.semCh <- struct{}{}: - } + ctx, cancel := context.WithTimeout(context.Background(), b.sendTimeout) + defer cancel() - defer func() { - <-b.semCh - }() + stopCh := make(chan struct{}) - bs, err := b.eventToBytes(event) - if err != nil { - klog.V(6).Infof("json marshal error, %s", err) - return - } + send := func() { + ctx, cancel := context.WithTimeout(context.Background(), b.getSenderTimeout) + defer cancel() - klog.V(8).Infof("%s", string(bs)) + select { + case <-ctx.Done(): + klog.Error("Get auditing event sender timeout") + return + case b.senderCh <- struct{}{}: + } - response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs)) - if err != nil { - klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err) - return - } + start := time.Now() + defer func() { + stopCh <- struct{}{} + klog.V(8).Infof("send %d auditing logs used %d", len(events.Items), time.Now().Sub(start).Milliseconds()) + }() - if response.StatusCode != http.StatusOK { - klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode) - return - } + bs, err := b.eventToBytes(events) + if err != nil { + klog.Errorf("json marshal error, %s", err) + return } - go send(event) + klog.V(8).Infof("%s", string(bs)) + + response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs)) + if err != nil { + klog.Errorf("send audit events error, %s", err) + return + } + + if response.StatusCode != http.StatusOK { + klog.Errorf("send audit events error[%d]", response.StatusCode) + return + } + } + + go send() + + defer func() { + <-b.senderCh + }() + + select { + case <-ctx.Done(): + klog.Error("send audit events timeout") + case <-stopCh: } } diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index d6dd0b08cca76743b32ec7d571a7135691015ebd..049fa647fefecd36d41e19f2c49c96c77bb2ad90 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -36,6 +36,7 @@ import ( "kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/devops" + options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "kubesphere.io/kubesphere/pkg/utils/iputil" "net" "net/http" @@ -46,8 +47,6 @@ const ( DefaultWebhook = "kube-auditing-webhook" DefaultCacheCapacity = 10000 CacheTimeout = time.Second - SendTimeout = time.Second * 3 - ChannelCapacity = 10 ) type Auditing interface { @@ -60,19 +59,19 @@ type Auditing interface { type auditing struct { webhookLister v1alpha1.WebhookLister devopsGetter v1alpha3.Interface - cache chan *auditv1alpha1.EventList + cache chan *auditv1alpha1.Event backend *Backend } -func NewAuditing(informers informers.InformerFactory, url string, stopCh <-chan struct{}) Auditing { +func NewAuditing(informers informers.InformerFactory, opts *options.Options, stopCh <-chan struct{}) Auditing { a := &auditing{ webhookLister: informers.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(), devopsGetter: devops.New(informers.KubeSphereSharedInformerFactory()), - cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity), + cache: make(chan *auditv1alpha1.Event, DefaultCacheCapacity), } - a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh) + a.backend = NewBackend(opts, a.cache, stopCh) return a } @@ -226,13 +225,11 @@ func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCaptu func (a *auditing) cacheEvent(e auditv1alpha1.Event) { - eventList := &auditv1alpha1.EventList{} - eventList.Items = append(eventList.Items, e) select { - case a.cache <- eventList: + case a.cache <- &e: return case <-time.After(CacheTimeout): - klog.Errorf("cache audit event %s timeout", e.AuditID) + klog.V(8).Infof("cache audit event %s timeout", e.AuditID) break } } diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go index 9da5ac9963f76bd9b7687464e5f413afbed4d3a2..161ed30ac67178ae32b03106c856be928c2008d7 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -19,14 +19,21 @@ package elasticsearch import ( "github.com/spf13/pflag" "kubesphere.io/kubesphere/pkg/utils/reflectutils" + "time" ) type Options struct { - Enable bool `json:"enable" yaml:"enable"` - WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` - Host string `json:"host" yaml:"host"` - IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` - Version string `json:"version" yaml:"version"` + Enable bool `json:"enable" yaml:"enable"` + WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` + // The maximum concurrent senders which send auditing events to the auditing webhook. + EventSendersNum int `json:"eventSendersNum" yaml:"eventSendersNum"` + // The batch size of auditing events. + EventBatchSize int `json:"eventBatchSize" yaml:"eventBatchSize"` + // The batch interval of auditing events. + EventBatchInterval time.Duration `json:"eventBatchInterval" yaml:"eventBatchInterval"` + Host string `json:"host" yaml:"host"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` + Version string `json:"version" yaml:"version"` } func NewElasticSearchOptions() *Options { @@ -52,7 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") - + fs.IntVar(&s.EventSendersNum, "auditing-event-senders-num", c.EventSendersNum, + "The maximum concurrent senders which send auditing events to the auditing webhook.") + fs.IntVar(&s.EventBatchSize, "auditing-event-batch-size", c.EventBatchSize, + "The batch size of auditing events.") + fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval, + "The batch interval of auditing events.") fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+