backend.go 2.8 KB
Newer Older
W
wanjunlei 已提交
1 2 3 4 5 6 7 8
package auditing

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/json"
	"k8s.io/klog"
9
	"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
W
wanjunlei 已提交
10 11 12 13 14 15
	"net/http"
	"time"
)

const (
	WaitTimeout = time.Second
W
wanjunlei 已提交
16
	WebhookURL  = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
W
wanjunlei 已提交
17 18 19
)

type Backend struct {
W
wanjunlei 已提交
20
	url             string
W
wanjunlei 已提交
21 22
	channelCapacity int
	semCh           chan interface{}
23
	cache           chan *v1alpha1.EventList
W
wanjunlei 已提交
24 25 26
	client          http.Client
	sendTimeout     time.Duration
	waitTimeout     time.Duration
W
wanjunlei 已提交
27
	stopCh          <-chan struct{}
W
wanjunlei 已提交
28 29
}

30
func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
W
wanjunlei 已提交
31 32

	b := Backend{
W
wanjunlei 已提交
33
		url:             url,
W
wanjunlei 已提交
34 35 36 37 38
		semCh:           make(chan interface{}, channelCapacity),
		channelCapacity: channelCapacity,
		waitTimeout:     WaitTimeout,
		cache:           cache,
		sendTimeout:     sendTimeout,
W
wanjunlei 已提交
39 40 41 42 43
		stopCh:          stopCh,
	}

	if len(b.url) == 0 {
		b.url = WebhookURL
W
wanjunlei 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
	}

	b.client = http.Client{
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
		},
		Timeout: b.sendTimeout,
	}

	go b.worker()

	return &b
}

func (b *Backend) worker() {

	for {

64
		var event *v1alpha1.EventList
W
wanjunlei 已提交
65 66 67 68 69
		select {
		case event = <-b.cache:
			if event == nil {
				break
			}
W
wanjunlei 已提交
70
		case <-b.stopCh:
W
wanjunlei 已提交
71 72 73
			break
		}

74
		send := func(event *v1alpha1.EventList) {
W
wanjunlei 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88
			ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
			defer cancel()

			select {
			case <-ctx.Done():
				klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID)
				return
			case b.semCh <- struct{}{}:
			}

			defer func() {
				<-b.semCh
			}()

89
			bs, err := b.eventToBytes(event)
W
wanjunlei 已提交
90
			if err != nil {
91
				klog.V(6).Infof("json marshal error, %s", err)
W
wanjunlei 已提交
92 93 94
				return
			}

95 96
			klog.V(8).Infof("%s", string(bs))

W
wanjunlei 已提交
97
			response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
W
wanjunlei 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111
			if err != nil {
				klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
				return
			}

			if response.StatusCode != http.StatusOK {
				klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode)
				return
			}
		}

		go send(event)
	}
}
112 113 114

func (b *Backend) eventToBytes(event *v1alpha1.EventList) ([]byte, error) {

W
wanjunlei 已提交
115 116 117 118 119 120
	bs, err := json.Marshal(event)
	if err != nil {
		// Normally, the serialization failure is caused by the failure of ResponseObject serialization.
		// To ensure the integrity of the auditing event to the greatest extent,
		// it is necessary to delete ResponseObject and and then try to serialize again.
		if event.Items[0].ResponseObject != nil {
121
			event.Items[0].ResponseObject = nil
W
wanjunlei 已提交
122
			return json.Marshal(event)
123
		}
W
wanjunlei 已提交
124 125

		return nil, err
126 127
	}

W
wanjunlei 已提交
128
	return bs, err
129
}