diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index eb5c01099a4bb23515e3fdf6d25d9a809ec245a2..078010156a2fb2b4ea1bb08be0616c84b8048bb2 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -29,55 +29,55 @@ import ( ) const ( - WaitTimeout = time.Second + GetSenderTimeout = time.Second SendTimeout = time.Second * 3 - DefaultGoroutinesNum = 100 + DefaultSendersNum = 100 DefaultBatchSize = 100 - DefaultBatchWait = time.Second * 3 + DefaultBatchInterval = time.Second * 3 WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" ) type Backend struct { - url string - semCh chan interface{} - cache chan *v1alpha1.Event - client http.Client - sendTimeout time.Duration - waitTimeout time.Duration - maxBatchSize int - maxBatchWait time.Duration - stopCh <-chan struct{} + url string + senderCh chan interface{} + cache chan *v1alpha1.Event + client http.Client + sendTimeout time.Duration + getSenderTimeout time.Duration + eventBatchSize int + eventBatchInterval time.Duration + stopCh <-chan struct{} } func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend { b := Backend{ - url: opts.WebhookUrl, - waitTimeout: WaitTimeout, - cache: cache, - sendTimeout: SendTimeout, - maxBatchSize: opts.MaxBatchSize, - maxBatchWait: opts.MaxBatchWait, - stopCh: stopCh, + url: opts.WebhookUrl, + getSenderTimeout: GetSenderTimeout, + cache: cache, + sendTimeout: SendTimeout, + eventBatchSize: opts.EventBatchSize, + eventBatchInterval: opts.EventBatchInterval, + stopCh: stopCh, } if len(b.url) == 0 { b.url = WebhookURL } - if b.maxBatchWait == 0 { - b.maxBatchWait = DefaultBatchWait + if b.eventBatchInterval == 0 { + b.eventBatchInterval = DefaultBatchInterval } - if b.maxBatchSize == 0 { - b.maxBatchSize = DefaultBatchSize + if b.eventBatchSize == 0 { + b.eventBatchSize = DefaultBatchSize } - goroutinesNum := opts.GoroutinesNum - if goroutinesNum == 0 { - goroutinesNum = DefaultGoroutinesNum + sendersNum := opts.EventSendersNum + if sendersNum == 0 { + sendersNum = DefaultSendersNum } - b.semCh = make(chan interface{}, goroutinesNum) + b.senderCh = make(chan interface{}, sendersNum) b.client = http.Client{ Transport: &http.Transport{ @@ -111,7 +111,7 @@ func (b *Backend) worker() { func (b *Backend) getEvents() *v1alpha1.EventList { - ctx, cancel := context.WithTimeout(context.Background(), b.maxBatchWait) + ctx, cancel := context.WithTimeout(context.Background(), b.eventBatchInterval) defer cancel() events := &v1alpha1.EventList{} @@ -122,7 +122,7 @@ func (b *Backend) getEvents() *v1alpha1.EventList { break } events.Items = append(events.Items, *event) - if len(events.Items) >= b.maxBatchSize { + if len(events.Items) >= b.eventBatchSize { return events } case <-ctx.Done(): @@ -141,14 +141,14 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { stopCh := make(chan struct{}) send := func() { - ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) + ctx, cancel := context.WithTimeout(context.Background(), b.getSenderTimeout) defer cancel() select { case <-ctx.Done(): - klog.Error("get goroutine timeout") + klog.Error("Get auditing event sender timeout") return - case b.semCh <- struct{}{}: + case b.senderCh <- struct{}{}: } start := time.Now() @@ -159,7 +159,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { bs, err := b.eventToBytes(events) if err != nil { - klog.V(6).Infof("json marshal error, %s", err) + klog.Errorf("json marshal error, %s", err) return } @@ -180,7 +180,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { go send() defer func() { - <-b.semCh + <-b.senderCh }() select { diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index 4ffc12b75eb12c4cbbce3b45ceb28df761a80cdb..049fa647fefecd36d41e19f2c49c96c77bb2ad90 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -229,7 +229,7 @@ func (a *auditing) cacheEvent(e auditv1alpha1.Event) { case a.cache <- &e: return case <-time.After(CacheTimeout): - klog.Errorf("cache audit event %s timeout", e.AuditID) + klog.V(8).Infof("cache audit event %s timeout", e.AuditID) break } } diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go index 2b6063a00e08b0cc0cd636a92c036b583746c430..161ed30ac67178ae32b03106c856be928c2008d7 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -25,15 +25,15 @@ import ( type Options struct { Enable bool `json:"enable" yaml:"enable"` WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` - // The number of goroutines which send auditing events to webhook. - GoroutinesNum int `json:"goroutinesNum" yaml:"goroutinesNum"` - // The max size of the auditing event in a batch. - MaxBatchSize int `json:"batchSize" yaml:"batchSize"` - // MaxBatchWait indicates the maximum interval between two batches. - MaxBatchWait time.Duration `json:"batchTimeout" yaml:"batchTimeout"` - Host string `json:"host" yaml:"host"` - IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` - Version string `json:"version" yaml:"version"` + // The maximum concurrent senders which send auditing events to the auditing webhook. + EventSendersNum int `json:"eventSendersNum" yaml:"eventSendersNum"` + // The batch size of auditing events. + EventBatchSize int `json:"eventBatchSize" yaml:"eventBatchSize"` + // The batch interval of auditing events. + EventBatchInterval time.Duration `json:"eventBatchInterval" yaml:"eventBatchInterval"` + Host string `json:"host" yaml:"host"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` + Version string `json:"version" yaml:"version"` } func NewElasticSearchOptions() *Options { @@ -59,12 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") - fs.IntVar(&s.GoroutinesNum, "auditing-goroutines-num", c.GoroutinesNum, - "The number of goroutines which send auditing events to webhook.") - fs.IntVar(&s.MaxBatchSize, "auditing-batch-max-size", c.MaxBatchSize, - "The max size of the auditing event in a batch.") - fs.DurationVar(&s.MaxBatchWait, "auditing-batch-max-wait", c.MaxBatchWait, - "MaxBatchWait indicates the maximum interval between two batches.") + fs.IntVar(&s.EventSendersNum, "auditing-event-senders-num", c.EventSendersNum, + "The maximum concurrent senders which send auditing events to the auditing webhook.") + fs.IntVar(&s.EventBatchSize, "auditing-event-batch-size", c.EventBatchSize, + "The batch size of auditing events.") + fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval, + "The batch interval of auditing events.") fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+