From 584bdda50e9c247932d1bc2dc399dfdf83ee40af Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Sat, 13 Jun 2020 17:13:55 +0800 Subject: [PATCH] Add two elements Message and Devops into Event struct. Pass the event object instead of RequestInfo by request context to request handler. --- pkg/apiserver/auditing/backend.go | 9 +-- pkg/apiserver/auditing/types.go | 86 ++++++++++-------------- pkg/apiserver/auditing/types_test.go | 36 ++++++++-- pkg/apiserver/auditing/v1alpha1/event.go | 20 ++++++ pkg/apiserver/filters/auditing.go | 10 +-- pkg/apiserver/request/context.go | 8 +-- 6 files changed, 102 insertions(+), 67 deletions(-) create mode 100644 pkg/apiserver/auditing/v1alpha1/event.go diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index 87287070..fd6a5cdb 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" "net/http" "time" ) @@ -19,14 +20,14 @@ type Backend struct { url string channelCapacity int semCh chan interface{} - cache chan *EventList + cache chan *v1alpha1.EventList client http.Client sendTimeout time.Duration waitTimeout time.Duration stopCh <-chan struct{} } -func NewBackend(url string, channelCapacity int, cache chan *EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { +func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { b := Backend{ url: url, @@ -60,7 +61,7 @@ func (b *Backend) worker() { for { - var event *EventList + var event *v1alpha1.EventList select { case event = <-b.cache: if event == nil { @@ -70,7 +71,7 @@ func (b *Backend) worker() { break } - send := func(event *EventList) { + send := func(event *v1alpha1.EventList) { ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) defer cancel() diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index 2700c753..775c84c8 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/apis/audit" "k8s.io/klog" + auditv1alpha1 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/client/listers/auditing/v1alpha1" "kubesphere.io/kubesphere/pkg/utils/iputil" @@ -28,26 +29,13 @@ const ( type Auditing interface { Enabled() bool K8sAuditingEnabled() bool - LogRequestObject(req *http.Request) *Event - LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) -} - -type Event struct { - //The workspace which this audit event happened - Workspace string - //The cluster which this audit event happened - Cluster string - - audit.Event -} - -type EventList struct { - Items []Event + LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event + LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo) } type auditing struct { lister v1alpha1.WebhookLister - cache chan *EventList + cache chan *auditv1alpha1.EventList backend *Backend } @@ -55,7 +43,7 @@ func NewAuditing(lister v1alpha1.WebhookLister, url string, stopCh <-chan struct a := &auditing{ lister: lister, - cache: make(chan *EventList, DefaultCacheCapacity), + cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity), } a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh) @@ -91,9 +79,26 @@ func (a *auditing) K8sAuditingEnabled() bool { return wh.Spec.K8sAuditingEnabled } -func (a *auditing) LogRequestObject(req *http.Request) *Event { - e := &Event{ +// If the request is not a standard request, or a resource request, +// or part of the audit information cannot be obtained through url, +// the function that handles the request can obtain Event from +// the context of the request, assign value to audit information, +// including name, verb, resource, subresource, message etc like this. +// +// info, ok := request.AuditEventFrom(request.Request.Context()) +// if ok { +// info.Verb = "post" +// info.Name = created.Name +// } +// +func (a *auditing) LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event { + + e := &auditv1alpha1.Event{ + Workspace: info.Workspace, + Cluster: info.Cluster, Event: audit.Event{ + RequestURI: info.Path, + Verb: info.Verb, Level: a.getAuditLevel(), AuditID: types.UID(uuid.New().String()), Stage: audit.StageResponseComplete, @@ -101,6 +106,16 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event { UserAgent: req.UserAgent(), RequestReceivedTimestamp: v1.NewMicroTime(time.Now()), Annotations: nil, + ObjectRef: &audit.ObjectReference{ + Resource: info.Resource, + Namespace: info.Namespace, + Name: info.Name, + UID: "", + APIGroup: info.APIGroup, + APIVersion: info.APIVersion, + ResourceVersion: info.ResourceScope, + Subresource: info.Subresource, + }, }, } @@ -133,7 +148,7 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event { return e } -func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) { +func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo) { // Auditing should igonre k8s request when k8s auditing is enabled. if info.IsKubernetesRequest && a.K8sAuditingEnabled() { @@ -146,43 +161,16 @@ func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *requ e.ResponseObject = &runtime.Unknown{Raw: resp.Bytes()} } - // If the request is not a standard request, or a resource request, - // or part of the audit information cannot be obtained through url, - // the function that handles the request can obtain RequestInfo from - // the context of the request, assign value to audit information, - // including name, verb, resource, subresource, etc like this. - // - // info, ok := request.RequestInfoFrom(request.Request.Context()) - // if ok { - // info.Verb = "post" - // info.Name = created.Name - // } - // - e.Workspace = info.Workspace - e.Cluster = info.Cluster - e.RequestURI = info.Path - e.Verb = info.Verb - e.ObjectRef = &audit.ObjectReference{ - Resource: info.Resource, - Namespace: info.Namespace, - Name: info.Name, - UID: "", - APIGroup: info.APIGroup, - APIVersion: info.APIVersion, - ResourceVersion: info.ResourceScope, - Subresource: info.Subresource, - } - a.cacheEvent(*e) } -func (a *auditing) cacheEvent(e Event) { +func (a *auditing) cacheEvent(e auditv1alpha1.Event) { if klog.V(8) { bs, _ := json.Marshal(e) klog.Infof("%s", string(bs)) } - eventList := &EventList{} + eventList := &auditv1alpha1.EventList{} eventList.Items = append(eventList.Items, e) select { case a.cache <- eventList: diff --git a/pkg/apiserver/auditing/types_test.go b/pkg/apiserver/auditing/types_test.go index df9d02c1..2e5367bb 100644 --- a/pkg/apiserver/auditing/types_test.go +++ b/pkg/apiserver/auditing/types_test.go @@ -10,6 +10,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" k8srequest "k8s.io/apiserver/pkg/endpoints/request" auditingv1alpha1 "kubesphere.io/kubesphere/pkg/apis/auditing/v1alpha1" + v1alpha12 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake" ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions" @@ -145,12 +146,25 @@ func TestAuditing_LogRequestObject(t *testing.T) { }, })) - e := a.LogRequestObject(req) + info := &request.RequestInfo{ + RequestInfo: &k8srequest.RequestInfo{ + IsResourceRequest: false, + Path: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces", + Verb: "create", + APIGroup: "tenant.kubesphere.io", + APIVersion: "v1alpha2", + Resource: "workspaces", + Name: "test", + }, + } + + e := a.LogRequestObject(req, info) - expectedEvent := &Event{ + expectedEvent := &v1alpha12.Event{ Event: audit.Event{ AuditID: e.AuditID, Level: "RequestResponse", + Verb: "create", Stage: "ResponseComplete", User: v1.UserInfo{ Username: "admin", @@ -161,8 +175,18 @@ func TestAuditing_LogRequestObject(t *testing.T) { SourceIPs: []string{ "192.168.0.2", }, - + RequestURI: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces", RequestReceivedTimestamp: e.RequestReceivedTimestamp, + ObjectRef: &audit.ObjectReference{ + Resource: "workspaces", + Namespace: "", + Name: "test", + UID: "", + APIGroup: "tenant.kubesphere.io", + APIVersion: "v1alpha2", + ResourceVersion: "", + Subresource: "", + }, }, } @@ -210,8 +234,6 @@ func TestAuditing_LogResponseObject(t *testing.T) { }, })) - e := a.LogRequestObject(req) - info := &request.RequestInfo{ RequestInfo: &k8srequest.RequestInfo{ IsResourceRequest: false, @@ -224,12 +246,14 @@ func TestAuditing_LogResponseObject(t *testing.T) { }, } + e := a.LogRequestObject(req, info) + resp := &ResponseCapture{} resp.WriteHeader(200) a.LogResponseObject(e, resp, info) - expectedEvent := &Event{ + expectedEvent := &v1alpha12.Event{ Event: audit.Event{ Verb: "create", AuditID: e.AuditID, diff --git a/pkg/apiserver/auditing/v1alpha1/event.go b/pkg/apiserver/auditing/v1alpha1/event.go new file mode 100644 index 00000000..f71fb565 --- /dev/null +++ b/pkg/apiserver/auditing/v1alpha1/event.go @@ -0,0 +1,20 @@ +package v1alpha1 + +import "k8s.io/apiserver/pkg/apis/audit" + +type Event struct { + // Devops project + Devops string + // The workspace which this audit event happened + Workspace string + // The cluster which this audit event happened + Cluster string + // Message send to user.s + Message string + + audit.Event +} + +type EventList struct { + Items []Event +} diff --git a/pkg/apiserver/filters/auditing.go b/pkg/apiserver/filters/auditing.go index 28b46ef8..17a73427 100644 --- a/pkg/apiserver/filters/auditing.go +++ b/pkg/apiserver/filters/auditing.go @@ -19,16 +19,18 @@ func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler { return } - e := a.LogRequestObject(req) - resp := auditing.NewResponseCapture(w) - handler.ServeHTTP(resp, req) - info, ok := request.RequestInfoFrom(req.Context()) if !ok { klog.Error("Unable to retrieve request info from request") + handler.ServeHTTP(w, req) return } + e := a.LogRequestObject(req, info) + req = req.WithContext(request.WithAuditEvent(req.Context(), e)) + resp := auditing.NewResponseCapture(w) + handler.ServeHTTP(resp, req) + go a.LogResponseObject(e, resp, info) }) } diff --git a/pkg/apiserver/request/context.go b/pkg/apiserver/request/context.go index d10bf107..41154176 100644 --- a/pkg/apiserver/request/context.go +++ b/pkg/apiserver/request/context.go @@ -18,9 +18,9 @@ package request import ( "context" + "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/authentication/user" ) @@ -87,12 +87,12 @@ func UserFrom(ctx context.Context) (user.Info, bool) { } // WithAuditEvent returns set audit event struct. -func WithAuditEvent(parent context.Context, ev *audit.Event) context.Context { +func WithAuditEvent(parent context.Context, ev *v1alpha1.Event) context.Context { return WithValue(parent, auditKey, ev) } // AuditEventFrom returns the audit event struct on the ctx -func AuditEventFrom(ctx context.Context) *audit.Event { - ev, _ := ctx.Value(auditKey).(*audit.Event) +func AuditEventFrom(ctx context.Context) *v1alpha1.Event { + ev, _ := ctx.Value(auditKey).(*v1alpha1.Event) return ev } -- GitLab