diff --git a/cmd/ks-apiserver/app/options/options.go b/cmd/ks-apiserver/app/options/options.go index 81fd77859ebaab41e575cd2302a27228d6f537ac..0d215a93a558d90d780ce3576c77c994b44dfdce 100644 --- a/cmd/ks-apiserver/app/options/options.go +++ b/cmd/ks-apiserver/app/options/options.go @@ -119,7 +119,7 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS } if s.LoggingOptions.Host != "" { - loggingClient, err := esclient.NewElasticsearch(s.LoggingOptions) + loggingClient, err := esclient.NewClient(s.LoggingOptions) if err != nil { return nil, fmt.Errorf("failed to connect to elasticsearch, please check elasticsearch status, error: %v", err) } diff --git a/pkg/api/auditing/v1alpha1/types.go b/pkg/api/auditing/v1alpha1/types.go index e791da8fa40a94941f1035a172edeabefdf747fb..ab521bbb006cf56d4d109f9a90dd2c511c02bea2 100644 --- a/pkg/api/auditing/v1alpha1/types.go +++ b/pkg/api/auditing/v1alpha1/types.go @@ -48,8 +48,8 @@ type Query struct { ResponseCodeFilter string `json:"response_code_filter,omitempty"` ResponseStatusFilter string `json:"response_status_filter,omitempty"` - StartTime *time.Time `json:"start_time,omitempty"` - EndTime *time.Time `json:"end_time,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + EndTime time.Time `json:"end_time,omitempty"` Interval string `json:"interval,omitempty"` Sort string `json:"sort,omitempty"` @@ -84,7 +84,7 @@ func ParseQueryParameter(req *restful.Request) (*Query, error) { return nil, err } t := time.Unix(sec, 0) - q.StartTime = &t + q.StartTime = t } if tstr := req.QueryParameter("end_time"); tstr != "" { sec, err := strconv.ParseInt(tstr, 10, 64) @@ -92,7 +92,7 @@ func ParseQueryParameter(req *restful.Request) (*Query, error) { return nil, err } t := time.Unix(sec, 0) - q.EndTime = &t + q.EndTime = t } if q.Interval = req.QueryParameter("interval"); q.Interval == "" { q.Interval = "15m" diff --git a/pkg/api/events/v1alpha1/types.go b/pkg/api/events/v1alpha1/types.go index 336816e1a0e4fa0473282099e63cb27f58f7385c..f73393217691c0d6495ec0246a99c027537927c4 100644 --- a/pkg/api/events/v1alpha1/types.go +++ b/pkg/api/events/v1alpha1/types.go @@ -43,8 +43,8 @@ type Query struct { MessageSearch string `json:"message_search,omitempty"` TypeFilter string `json:"type_filter,omitempty"` - StartTime *time.Time `json:"start_time,omitempty"` - EndTime *time.Time `json:"end_time,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + EndTime time.Time `json:"end_time,omitempty"` Interval string `json:"interval,omitempty"` Sort string `json:"sort,omitempty"` @@ -74,7 +74,7 @@ func ParseQueryParameter(req *restful.Request) (*Query, error) { return nil, err } t := time.Unix(sec, 0) - q.StartTime = &t + q.StartTime = t } if tstr := req.QueryParameter("end_time"); tstr != "" { sec, err := strconv.ParseInt(tstr, 10, 64) @@ -82,7 +82,7 @@ func ParseQueryParameter(req *restful.Request) (*Query, error) { return nil, err } t := time.Unix(sec, 0) - q.EndTime = &t + q.EndTime = t } if q.Interval = req.QueryParameter("interval"); q.Interval == "" { q.Interval = "15m" diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e056f3ba65b63273e4e89b82a65a26367dd46c9b..1d1de3b863d1411bf3009308fbb4da8be592b7de 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -130,7 +130,7 @@ type APIServer struct { OpenpitrixClient openpitrix.Client // - LoggingClient logging.Interface + LoggingClient logging.Client // DevopsClient devops.Interface diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index 201e6b4057a6cf8ca2fe2834cbcdb58237bd45e5..4426ad1eea56f35d9b7ef548d49c282fbaa54b1d 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -22,13 +22,13 @@ import ( authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options" authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options" "kubesphere.io/kubesphere/pkg/simple/client/alerting" - auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" - eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/events" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/ldap" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/logging" "kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/multicluster" "kubesphere.io/kubesphere/pkg/simple/client/network" @@ -88,12 +88,12 @@ type Config struct { S3Options *s3.Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"` OpenPitrixOptions *openpitrix.Options `json:"openpitrix,omitempty" yaml:"openpitrix,omitempty" mapstructure:"openpitrix"` MonitoringOptions *prometheus.Options `json:"monitoring,omitempty" yaml:"monitoring,omitempty" mapstructure:"monitoring"` - LoggingOptions *elasticsearch.Options `json:"logging,omitempty" yaml:"logging,omitempty" mapstructure:"logging"` + LoggingOptions *logging.Options `json:"logging,omitempty" yaml:"logging,omitempty" mapstructure:"logging"` AuthenticationOptions *authoptions.AuthenticationOptions `json:"authentication,omitempty" yaml:"authentication,omitempty" mapstructure:"authentication"` AuthorizationOptions *authorizationoptions.AuthorizationOptions `json:"authorization,omitempty" yaml:"authorization,omitempty" mapstructure:"authorization"` MultiClusterOptions *multicluster.Options `json:"multicluster,omitempty" yaml:"multicluster,omitempty" mapstructure:"multicluster"` - EventsOptions *eventsclient.Options `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"` - AuditingOptions *auditingclient.Options `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"` + EventsOptions *events.Options `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"` + AuditingOptions *auditing.Options `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"` AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"` NotificationOptions *notification.Options `json:"notification,omitempty" yaml:"notification,omitempty" mapstructure:"notification"` } @@ -113,12 +113,12 @@ func New() *Config { MonitoringOptions: prometheus.NewPrometheusOptions(), AlertingOptions: alerting.NewAlertingOptions(), NotificationOptions: notification.NewNotificationOptions(), - LoggingOptions: elasticsearch.NewElasticSearchOptions(), + LoggingOptions: logging.NewLoggingOptions(), AuthenticationOptions: authoptions.NewAuthenticateOptions(), AuthorizationOptions: authorizationoptions.NewAuthorizationOptions(), MultiClusterOptions: multicluster.NewOptions(), - EventsOptions: eventsclient.NewElasticSearchOptions(), - AuditingOptions: auditingclient.NewElasticSearchOptions(), + EventsOptions: events.NewEventsOptions(), + AuditingOptions: auditing.NewAuditingOptions(), } } diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index 42445b544007aa04200dd1903a5de6684424081b..b55d7dfc36064a080049ab8de068770883cc4608 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -25,13 +25,13 @@ import ( authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options" authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options" "kubesphere.io/kubesphere/pkg/simple/client/alerting" - auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/auditing" "kubesphere.io/kubesphere/pkg/simple/client/cache" "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" - eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/events" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/ldap" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch" + "kubesphere.io/kubesphere/pkg/simple/client/logging" "kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/multicluster" "kubesphere.io/kubesphere/pkg/simple/client/network" @@ -110,7 +110,7 @@ func newTestConfig() (*Config, error) { MonitoringOptions: &prometheus.Options{ Endpoint: "http://prometheus.kubesphere-monitoring-system.svc", }, - LoggingOptions: &elasticsearch.Options{ + LoggingOptions: &logging.Options{ Host: "http://elasticsearch-logging.kubesphere-logging-system.svc:9200", IndexPrefix: "elk", Version: "6", @@ -144,12 +144,12 @@ func newTestConfig() (*Config, error) { MultiClusterOptions: &multicluster.Options{ Enable: false, }, - EventsOptions: &eventsclient.Options{ + EventsOptions: &events.Options{ Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200", IndexPrefix: "ks-logstash-events", Version: "6", }, - AuditingOptions: &auditingclient.Options{ + AuditingOptions: &auditing.Options{ Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200", IndexPrefix: "ks-logstash-auditing", Version: "6", diff --git a/pkg/kapis/tenant/v1alpha2/handler.go b/pkg/kapis/tenant/v1alpha2/handler.go index c95769f92e422071853be5404b7d7d4637239e6f..65bddea452eab072a0222781abbb488f16190620 100644 --- a/pkg/kapis/tenant/v1alpha2/handler.go +++ b/pkg/kapis/tenant/v1alpha2/handler.go @@ -48,7 +48,7 @@ type tenantHandler struct { } func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, - evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client, + evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer) *tenantHandler { return &tenantHandler{ diff --git a/pkg/kapis/tenant/v1alpha2/register.go b/pkg/kapis/tenant/v1alpha2/register.go index e6e5ce27846b12f3b7a2f3f1e67137d56b424834..cf295e5e4f79b0222f19a1d948e046b7a8d830a3 100644 --- a/pkg/kapis/tenant/v1alpha2/register.go +++ b/pkg/kapis/tenant/v1alpha2/register.go @@ -52,7 +52,7 @@ func Resource(resource string) schema.GroupResource { } func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, - ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, + ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer) error { mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson} diff --git a/pkg/models/logging/logging.go b/pkg/models/logging/logging.go index 69ff75fb6a728d097829c58f6fd2cba5a6c682ae..dd4a07a988cfe409c136ae992a001c3d07e9ad8f 100644 --- a/pkg/models/logging/logging.go +++ b/pkg/models/logging/logging.go @@ -30,10 +30,10 @@ type LoggingOperator interface { } type loggingOperator struct { - c logging.Interface + c logging.Client } -func NewLoggingOperator(client logging.Interface) LoggingOperator { +func NewLoggingOperator(client logging.Client) LoggingOperator { return &loggingOperator{client} } diff --git a/pkg/models/tenant/tenant.go b/pkg/models/tenant/tenant.go index 69feffe1416b99ebd8dbb687713ab46d5efb4630..f865bcd028ab1dac2e81ffaf3498d650cbadbefc 100644 --- a/pkg/models/tenant/tenant.go +++ b/pkg/models/tenant/tenant.go @@ -91,7 +91,7 @@ type tenantOperator struct { auditing auditing.Interface } -func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer) Interface { +func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer) Interface { return &tenantOperator{ am: am, authorizer: authorizer, diff --git a/pkg/simple/client/auditing/elasticsearch/clients.go b/pkg/simple/client/auditing/elasticsearch/clients.go deleted file mode 100644 index b9bf905756fef17f872ddcf46e6d0740623a3587..0000000000000000000000000000000000000000 --- a/pkg/simple/client/auditing/elasticsearch/clients.go +++ /dev/null @@ -1,171 +0,0 @@ -/* -Copyright 2020 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package elasticsearch - -import ( - "fmt" - es5 "github.com/elastic/go-elasticsearch/v5" - es5api "github.com/elastic/go-elasticsearch/v5/esapi" - es6 "github.com/elastic/go-elasticsearch/v6" - es6api "github.com/elastic/go-elasticsearch/v6/esapi" - es7 "github.com/elastic/go-elasticsearch/v7" - es7api "github.com/elastic/go-elasticsearch/v7/esapi" - jsoniter "github.com/json-iterator/go" - "io" - "net/http" -) - -type Request struct { - Index string - Body io.Reader -} - -type Response struct { - Hits Hits `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` -} - -type Hits struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` -} - -type Error struct { - Type string `json:"type"` - Reason string `json:"reason"` - Status int `json:"status"` -} - -func (e Error) Error() string { - return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason) -} - -type ClientV5 es5.Client - -func (c *ClientV5) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer func() { - _ = resp.Body.Close() - }() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} -func (c *ClientV5) Version() (string, error) { - resp, err := c.Info() - if err != nil { - return "", err - } - defer func() { - _ = resp.Body.Close() - }() - if resp.IsError() { - return "", fmt.Errorf(resp.String()) - } - var r map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return "", fmt.Errorf("error parsing the response body: %s", err) - } - return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil -} - -type ClientV6 es6.Client - -func (c *ClientV6) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer func() { - _ = resp.Body.Close() - }() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits *struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} - -type ClientV7 es7.Client - -func (c *ClientV7) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer func() { - _ = resp.Body.Close() - }() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits *struct { - Total struct { - Value int64 `json:"value"` - } `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} - -type client interface { - ExSearch(r *Request) (*Response, error) -} diff --git a/pkg/simple/client/auditing/elasticsearch/elasticsearch.go b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go index 75b1b0a7cbe198c1cb09c4653d2e79cf216a2ce9..8dbbb9bb49a3f8ef706b0fa53c4c16c57adad6c0 100644 --- a/pkg/simple/client/auditing/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go @@ -17,497 +17,198 @@ limitations under the License. package elasticsearch import ( - "bytes" "fmt" - "kubesphere.io/kubesphere/pkg/utils/esutil" - "strings" - "sync" - "time" - - es5 "github.com/elastic/go-elasticsearch/v5" - es6 "github.com/elastic/go-elasticsearch/v6" - es7 "github.com/elastic/go-elasticsearch/v7" - jsoniter "github.com/json-iterator/go" + "github.com/json-iterator/go" "kubesphere.io/kubesphere/pkg/simple/client/auditing" -) - -const ( - ElasticV5 = "5" - ElasticV6 = "6" - ElasticV7 = "7" + "kubesphere.io/kubesphere/pkg/simple/client/es" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary -type Elasticsearch struct { - host string - version string - index string - - c client - mux sync.Mutex +type client struct { + c *es.Client } -func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size int64, +func (c *client) SearchAuditingEvent(filter *auditing.Filter, from, size int64, sort string) (*auditing.Events, error) { - if err := es.loadClient(); err != nil { - return &auditing.Events{}, err - } - - queryPart := parseToQueryPart(filter) - if sort == "" { - sort = "desc" - } - sortPart := []map[string]interface{}{{ - "RequestReceivedTimestamp": map[string]string{"order": sort}, - }} - b := map[string]interface{}{ - "from": from, - "size": size, - "query": queryPart, - "sort": sortPart, - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithSort("RequestReceivedTimestamp", sort). + WithFrom(from). + WithSize(size) - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.index, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil || resp == nil { return nil, err } - var innerHits []struct { - *auditing.Event `json:"_source"` + events := &auditing.Events{Total: c.c.GetTotalHitCount(resp.Total)} + for _, hit := range resp.AllHits { + events.Records = append(events.Records, hit.Source) } - if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil { - return nil, err - } - evts := auditing.Events{Total: resp.Hits.Total} - for _, hit := range innerHits { - evts.Records = append(evts.Records, hit.Event) - } - return &evts, nil + return events, nil } -func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) { - - if err := es.loadClient(); err != nil { - return &auditing.Histogram{}, err - } +func (c *client) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) { if interval == "" { interval = "15m" } - queryPart := parseToQueryPart(filter) - aggName := "events_count_over_timestamp" - aggsPart := map[string]interface{}{ - aggName: map[string]interface{}{ - "date_histogram": map[string]string{ - "field": "RequestReceivedTimestamp", - "interval": interval, - }, - }, - } - b := map[string]interface{}{ - "query": queryPart, - "aggs": aggsPart, - "size": 0, // do not get docs - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithAggregations(query.NewAggregations(). + WithDateHistogramAggregation("RequestReceivedTimestamp", interval)). + WithSize(0) - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.index, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil || resp == nil { return nil, err } - raw, ok := resp.Aggregations[aggName] - if !ok || len(raw) == 0 { - return &auditing.Histogram{}, nil - } - var agg struct { - Buckets []struct { - KeyAsString string `json:"key_as_string"` - Key int64 `json:"key"` - DocCount int64 `json:"doc_count"` - } `json:"buckets"` - } - if err := json.Unmarshal(raw, &agg); err != nil { - return nil, err - } - h := auditing.Histogram{Total: resp.Hits.Total} - for _, b := range agg.Buckets { + h := auditing.Histogram{Total: c.c.GetTotalHitCount(resp.Total)} + for _, bucket := range resp.Buckets { h.Buckets = append(h.Buckets, - auditing.Bucket{Time: b.Key, Count: b.DocCount}) + auditing.Bucket{Time: bucket.Key, Count: bucket.Count}) } return &h, nil } -func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) { +func (c *client) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) { - if err := es.loadClient(); err != nil { - return &auditing.Statistics{}, err - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithAggregations(query.NewAggregations(). + WithCardinalityAggregation("AuditID.keyword")). + WithSize(0) - queryPart := parseToQueryPart(filter) - aggName := "resources_count" - aggsPart := map[string]interface{}{ - aggName: map[string]interface{}{ - "cardinality": map[string]string{ - "field": "AuditID.keyword", - }, - }, - } - b := map[string]interface{}{ - "query": queryPart, - "aggs": aggsPart, - "size": 0, // do not get docs - } - - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.index, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil || resp == nil { return nil, err } - raw, ok := resp.Aggregations[aggName] - if !ok || len(raw) == 0 { - return &auditing.Statistics{}, nil - } - var agg struct { - Value int64 `json:"value"` - } - if err := json.Unmarshal(raw, &agg); err != nil { - return nil, err - } - return &auditing.Statistics{ - Resources: agg.Value, - Events: resp.Hits.Total, + Resources: resp.Value, + Events: c.c.GetTotalHitCount(resp.Total), }, nil } -func NewClient(options *Options) (*Elasticsearch, error) { - es := &Elasticsearch{ - host: options.Host, - version: options.Version, - index: options.IndexPrefix, - } - - err := es.initEsClient(es.version) - return es, err -} - -func (es *Elasticsearch) initEsClient(version string) error { - clientV5 := func() (*ClientV5, error) { - c, err := es5.NewClient(es5.Config{Addresses: []string{es.host}}) - if err != nil { - return nil, err - } - return (*ClientV5)(c), nil - } - clientV6 := func() (*ClientV6, error) { - c, err := es6.NewClient(es6.Config{Addresses: []string{es.host}}) - if err != nil { - return nil, err - } - return (*ClientV6)(c), nil - } - clientV7 := func() (*ClientV7, error) { - c, err := es7.NewClient(es7.Config{Addresses: []string{es.host}}) - if err != nil { - return nil, err - } - return (*ClientV7)(c), nil - } +func NewClient(options *auditing.Options) (auditing.Client, error) { + c := &client{} var err error - switch version { - case ElasticV5: - es.c, err = clientV5() - case ElasticV6: - es.c, err = clientV6() - case ElasticV7: - es.c, err = clientV7() - case "": - es.c = nil - default: - err = fmt.Errorf("unsupported elasticsearch version %s", es.version) - } - - return err + c.c, err = es.NewClient(options.Host, options.IndexPrefix, options.Version) + return c, err } -func (es *Elasticsearch) loadClient() error { - - // Check if Elasticsearch client has been initialized. - if es.c != nil { +func parseToQueryPart(f *auditing.Filter) *query.Query { + if f == nil { return nil } - // Create Elasticsearch client. - es.mux.Lock() - defer es.mux.Unlock() + var mini int32 = 1 + b := query.NewBool() - if es.c != nil { - return nil + bi := query.NewBool().WithMinimumShouldMatch(mini) + for k, v := range f.ObjectRefNamespaceMap { + bi.AppendShould(query.NewBool(). + AppendFilter(query.NewMatchPhrase("ObjectRef.Namespace", k)). + AppendFilter(query.NewRange("RequestReceivedTimestamp"). + WithGTE(v))) } - c, e := es5.NewClient(es5.Config{Addresses: []string{es.host}}) - if e != nil { - return e + for k, v := range f.WorkspaceMap { + bi.AppendShould(query.NewBool(). + AppendFilter(query.NewMatchPhrase("Workspace", k)). + AppendFilter(query.NewRange("RequestReceivedTimestamp"). + WithGTE(v))) } - version, err := (*ClientV5)(c).Version() - if err != nil { - return err - } + b.AppendFilter(bi) - v := strings.Split(version, ".")[0] - err = es.initEsClient(v) - if err != nil { - return err - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("ObjectRef.Namespace.keyword", f.ObjectRefNamespaces)). + WithMinimumShouldMatch(mini)) - es.version = v - return nil -} - -func parseToQueryPart(f *auditing.Filter) interface{} { - if f == nil { - return nil - } - type BoolBody struct { - Filter []map[string]interface{} `json:"filter,omitempty"` - Should []map[string]interface{} `json:"should,omitempty"` - MinimumShouldMatch *int `json:"minimum_should_match,omitempty"` - } - var mini = 1 - b := BoolBody{} - queryBody := map[string]interface{}{ - "bool": &b, + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, ns := range f.ObjectRefNamespaceFuzzy { + bi.AppendShould(query.NewWildcard("ObjectRef.Namespace.keyword", fmt.Sprintf("*"+ns+"*"))) } + b.AppendFilter(bi) - if len(f.ObjectRefNamespaceMap) > 0 || len(f.WorkspaceMap) > 0 { - bi := BoolBody{MinimumShouldMatch: &mini} - for k, v := range f.ObjectRefNamespaceMap { - bi.Should = append(bi.Should, map[string]interface{}{ - "bool": &BoolBody{ - Filter: []map[string]interface{}{{ - "match_phrase": map[string]string{"ObjectRef.Namespace.keyword": k}, - }, { - "range": map[string]interface{}{ - "RequestReceivedTimestamp": map[string]interface{}{ - "gte": v, - }, - }, - }}, - }, - }) - } - - for k, v := range f.WorkspaceMap { - bi.Should = append(bi.Should, map[string]interface{}{ - "bool": &BoolBody{ - Filter: []map[string]interface{}{{ - "match_phrase": map[string]string{"Workspace.keyword": k}, - }, { - "range": map[string]interface{}{ - "RequestReceivedTimestamp": map[string]interface{}{ - "gte": v, - }, - }, - }}, - }, - }) - } - - if len(bi.Should) > 0 { - b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi}) - } - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("Workspace.keyword", f.Workspaces)). + WithMinimumShouldMatch(mini)) - shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody { - bi := BoolBody{MinimumShouldMatch: &mini} - for _, v := range fieldValues { - if fieldValueMutate != nil { - v = fieldValueMutate(v) - } - bi.Should = append(bi.Should, map[string]interface{}{ - mtype: map[string]string{fieldName: v}, - }) - } - if len(bi.Should) == 0 { - return nil - } - return &bi + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, ws := range f.WorkspaceFuzzy { + bi.AppendShould(query.NewWildcard("Workspace.keyword", fmt.Sprintf("*"+ws+"*"))) } + b.AppendFilter(bi) - if len(f.ObjectRefNamespaces) > 0 { - if bi := shouldBoolbody("match_phrase", "ObjectRef.Namespace.keyword", - f.ObjectRefNamespaces, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.ObjectRefNamespaceFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "ObjectRef.Namespace.keyword", - f.ObjectRefNamespaceFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("ObjectRef.Name.keyword", f.ObjectRefNames)). + WithMinimumShouldMatch(mini)) - if len(f.Workspaces) > 0 { - if bi := shouldBoolbody("match_phrase", "Workspace.keyword", - f.Workspaces, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.WorkspaceFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "Workspace.keyword", - f.WorkspaceFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, name := range f.ObjectRefNameFuzzy { + bi.AppendShould(query.NewWildcard("ObjectRef.Name.keyword", fmt.Sprintf("*"+name+"*"))) } + b.AppendFilter(bi) - if len(f.ObjectRefNames) > 0 { - if bi := shouldBoolbody("match_phrase", "ObjectRef.Name.keyword", - f.ObjectRefNames, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.ObjectRefNameFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "ObjectRef.Name.keyword", - f.ObjectRefNameFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("Verb", f.Verbs)). + WithMinimumShouldMatch(mini)) + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("Level", f.Levels)). + WithMinimumShouldMatch(mini)) - if len(f.Verbs) > 0 { - if bi := shouldBoolbody("match_phrase", "Verb.keyword", - f.Verbs, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.Levels) > 0 { - if bi := shouldBoolbody("match_phrase", "Level.keyword", - f.Levels, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - - if len(f.SourceIpFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "SourceIPs.keyword", - f.SourceIpFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - - if len(f.Users) > 0 { - if bi := shouldBoolbody("match_phrase", "User.Username.keyword", - f.Users, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.UserFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "User.Username.keyword", - f.UserFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, ip := range f.SourceIpFuzzy { + bi.AppendShould(query.NewWildcard("SourceIPs.keyword", fmt.Sprintf("*"+ip+"*"))) } + b.AppendFilter(bi) - if len(f.GroupFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "User.Groups.keyword", - f.GroupFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("User.Username.keyword", f.Users)). + WithMinimumShouldMatch(mini)) - if len(f.ObjectRefResources) > 0 { - if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Resource.keyword", - f.ObjectRefResources, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, user := range f.UserFuzzy { + bi.AppendShould(query.NewWildcard("User.Username.keyword", fmt.Sprintf("*"+user+"*"))) } + b.AppendFilter(bi) - if len(f.ObjectRefSubresources) > 0 { - if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Subresource.keyword", - f.ObjectRefSubresources, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, group := range f.GroupFuzzy { + bi.AppendShould(query.NewWildcard("User.Groups.keyword", fmt.Sprintf("*"+group+"*"))) } + b.AppendFilter(bi) - if f.ResponseCodes != nil && len(f.ResponseCodes) > 0 { - - bi := BoolBody{MinimumShouldMatch: &mini} - for _, v := range f.ResponseCodes { - bi.Should = append(bi.Should, map[string]interface{}{ - "term": map[string]int32{"ResponseStatus.code": v}, - }) - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("ObjectRef.Resource", f.ObjectRefResources)). + WithMinimumShouldMatch(mini)) + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("ObjectRef.Subresource", f.ObjectRefSubresources)). + WithMinimumShouldMatch(mini)) + b.AppendFilter(query.NewBool(). + AppendShould(query.NewTerms("ResponseStatus.code", f.ResponseCodes)). + WithMinimumShouldMatch(mini)) + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("ResponseStatus.status", f.ResponseStatus)). + WithMinimumShouldMatch(mini)) - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) + r := query.NewRange("RequestReceivedTimestamp") + if !f.StartTime.IsZero() { + r.WithGTE(f.StartTime) } - - if len(f.ResponseStatus) > 0 { - if bi := shouldBoolbody("match_phrase", "ResponseStatus.status", - f.ResponseStatus, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + if !f.EndTime.IsZero() { + r.WithLTE(f.EndTime) } - if f.StartTime != nil || f.EndTime != nil { - m := make(map[string]*time.Time) - if f.StartTime != nil { - m["gte"] = f.StartTime - } - if f.EndTime != nil { - m["lte"] = f.EndTime - } - b.Filter = append(b.Filter, map[string]interface{}{ - "range": map[string]interface{}{"RequestReceivedTimestamp": m}, - }) - - } + b.AppendFilter(r) - return queryBody -} - -func resolveIndexNames(prefix string, start, end *time.Time) string { - var s, e time.Time - if start != nil { - s = *start - } - if end != nil { - e = *end - } - return esutil.ResolveIndexNames(prefix, s, e) + return query.NewQuery().WithBool(b) } diff --git a/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go b/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go index 5fd8b593984f63d1fda42ffdf9b40f1642cd462a..d17e20cf67bff7ecd7ad5065255a831dc3863dae 100644 --- a/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go +++ b/pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go @@ -22,7 +22,6 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/auditing" "net/http" "net/http/httptest" - "strconv" "strings" "testing" "time" @@ -69,7 +68,7 @@ func TestStatisticsOnResources(t *testing.T) { ] }, "aggregations": { - "resources_count": { + "cardinality_aggregation": { "value": 100 } } @@ -116,18 +115,18 @@ func TestStatisticsOnResources(t *testing.T) { mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp) defer mes.Close() - es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"}) + c, err := NewClient(&auditing.Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: test.fakeVersion}) if err != nil { t.Fatal(err) } - stats, err := es.StatisticsOnResources(&test.filter) + stats, err := c.StatisticsOnResources(&test.filter) if test.expectedError { if err == nil { t.Fatalf("expected err like %s", test.fakeResp) - } else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) { + } else if !strings.Contains(err.Error(), "index_not_found_exception") { t.Fatalf("err does not contain expected code: %d", test.fakeCode) } } else { @@ -144,187 +143,209 @@ func TestStatisticsOnResources(t *testing.T) { func TestParseToQueryPart(t *testing.T) { q := ` { - "bool": { - "filter": [ - { - "bool": { - "should": [ - { - "bool": { - "filter": [ - { - "match_phrase": { - "ObjectRef.Namespace.keyword": "kubesphere-system" - } - }, - { - "range": { - "RequestReceivedTimestamp": { - "gte": "2020-01-01T01:01:01.000000001Z" - } - } - } - ] - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "ObjectRef.Name.keyword": "istio" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "wildcard": { - "ObjectRef.Name.keyword": "*istio*" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "Verb.keyword": "create" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "Level.keyword": "Metadata" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "wildcard": { - "SourceIPs.keyword": "*192.168*" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "User.Username.keyword": "system:serviceaccount:kubesphere-system:kubesphere" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "wildcard": { - "User.Username.keyword": "*system:serviceaccount*" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "wildcard": { - "User.Groups.keyword": "*system:serviceaccounts*" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase_prefix": { - "ObjectRef.Resource.keyword": "devops" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase_prefix": { - "ObjectRef.Subresource.keyword": "pipeline" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "term": { - "ResponseStatus.code": 404 - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "ResponseStatus.status": "Failure" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "range": { - "RequestReceivedTimestamp": { - "gte": "2019-12-01T01:01:01.000000001Z", - "lte": "2020-01-01T01:01:01.000000001Z" - } - } - } - ] - } + "query":{ + "bool":{ + "filter":[ + { + "bool":{ + "should":[ + { + "bool":{ + "filter":[ + { + "match_phrase":{ + "ObjectRef.Namespace":"kubesphere-system" + } + }, + { + "range":{ + "RequestReceivedTimestamp":{ + "gte":"2020-01-01T01:01:01.000000001Z" + } + } + } + ] + } + }, + { + "bool":{ + "filter":[ + { + "match_phrase":{ + "Workspace":"system-workspace" + } + }, + { + "range":{ + "RequestReceivedTimestamp":{ + "gte":"2020-01-01T01:01:01.000000001Z" + } + } + } + ] + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "ObjectRef.Name.keyword":"devops" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "wildcard":{ + "ObjectRef.Name.keyword":"*dev*" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "Verb":"create" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "Level":"Metadata" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "wildcard":{ + "SourceIPs.keyword":"*192.168*" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "User.Username.keyword":"system:serviceaccount:kubesphere-system:kubesphere" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "wildcard":{ + "User.Username.keyword":"*system*" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "wildcard":{ + "User.Groups.keyword":"*system*" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase_prefix":{ + "ObjectRef.Resource":"pods" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase_prefix":{ + "ObjectRef.Subresource":"exec" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "terms":{ + "ResponseStatus.code":[ + 404 + ] + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "ResponseStatus.status":"Failure" + } + } + ], + "minimum_should_match":1 + } + }, + { + "range":{ + "RequestReceivedTimestamp":{ + "gte":"2019-12-01T01:01:01.000000001Z", + "lte":"2020-01-01T01:01:01.000000001Z" + } + } + } + ] + } + } } ` nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC) @@ -335,20 +356,23 @@ func TestParseToQueryPart(t *testing.T) { ObjectRefNamespaceMap: map[string]time.Time{ "kubesphere-system": nsCreateTime, }, - ObjectRefNames: []string{"istio"}, - ObjectRefNameFuzzy: []string{"istio"}, + WorkspaceMap: map[string]time.Time{ + "system-workspace": nsCreateTime, + }, + ObjectRefNames: []string{"devops"}, + ObjectRefNameFuzzy: []string{"dev"}, Levels: []string{"Metadata"}, Verbs: []string{"create"}, Users: []string{"system:serviceaccount:kubesphere-system:kubesphere"}, - UserFuzzy: []string{"system:serviceaccount"}, - GroupFuzzy: []string{"system:serviceaccounts"}, + UserFuzzy: []string{"system"}, + GroupFuzzy: []string{"system"}, SourceIpFuzzy: []string{"192.168"}, - ObjectRefResources: []string{"devops"}, - ObjectRefSubresources: []string{"pipeline"}, + ObjectRefResources: []string{"pods"}, + ObjectRefSubresources: []string{"exec"}, ResponseCodes: []int32{404}, ResponseStatus: []string{"Failure"}, - StartTime: &startTime, - EndTime: &endTime, + StartTime: startTime, + EndTime: endTime, } qp := parseToQueryPart(filter) diff --git a/pkg/simple/client/auditing/interface.go b/pkg/simple/client/auditing/interface.go index 48e1907f28d39776c18163fc3194e9ddc96dc41f..1fc92786c1544f1c49db93f03f47b5d6de177393 100644 --- a/pkg/simple/client/auditing/interface.go +++ b/pkg/simple/client/auditing/interface.go @@ -45,15 +45,15 @@ type Filter struct { ObjectRefSubresources []string ResponseCodes []int32 ResponseStatus []string - StartTime *time.Time - EndTime *time.Time + StartTime time.Time + EndTime time.Time } type Event map[string]interface{} type Events struct { - Total int64 `json:"total" description:"total number of matched results"` - Records []*Event `json:"records" description:"actual array of results"` + Total int64 `json:"total" description:"total number of matched results"` + Records []interface{} `json:"records" description:"actual array of results"` } type Histogram struct { diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/options.go similarity index 95% rename from pkg/simple/client/auditing/elasticsearch/options.go rename to pkg/simple/client/auditing/options.go index 161ed30ac67178ae32b03106c856be928c2008d7..11824a9cacaa2da2535cbddb0b10b73ecdcf60b1 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/options.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package elasticsearch +package auditing import ( "github.com/spf13/pflag" @@ -36,7 +36,7 @@ type Options struct { Version string `json:"version" yaml:"version"` } -func NewElasticSearchOptions() *Options { +func NewAuditingOptions() *Options { return &Options{ Host: "", IndexPrefix: "ks-logstash-auditing", @@ -65,6 +65,8 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { "The batch size of auditing events.") fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval, "The batch interval of auditing events.") + fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing webhook url") + fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+ diff --git a/pkg/simple/client/es/client.go b/pkg/simple/client/es/client.go new file mode 100644 index 0000000000000000000000000000000000000000..41e8dbe4b4fdf6c6a4130df29a19667721d77c46 --- /dev/null +++ b/pkg/simple/client/es/client.go @@ -0,0 +1,189 @@ +/* +Copyright 2020 KubeSphere Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package es + +import ( + "context" + "fmt" + "github.com/json-iterator/go" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" + "kubesphere.io/kubesphere/pkg/simple/client/es/versions" + "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v5" + "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v6" + "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v7" + "kubesphere.io/kubesphere/pkg/utils/esutil" + "strings" + "sync" + "time" +) + +const ( + ElasticV5 = "5" + ElasticV6 = "6" + ElasticV7 = "7" +) + +// Elasticsearch client +type Client struct { + host string + version string + index string + + c versions.Client + mux sync.Mutex +} + +func NewClient(host, indexPrefix, version string) (*Client, error) { + var err error + es := &Client{ + host: host, + version: version, + index: indexPrefix, + } + + switch es.version { + case ElasticV5: + es.c, err = v5.New(es.host, es.index) + case ElasticV6: + es.c, err = v6.New(es.host, es.index) + case ElasticV7: + es.c, err = v7.New(es.host, es.index) + case "": + es.c = nil + default: + return nil, fmt.Errorf("unsupported elasticsearch version %s", es.version) + } + + return es, err +} + +func (c *Client) loadClient() error { + // Check if Elasticsearch client has been initialized. + if c.c != nil { + return nil + } + + // Create Elasticsearch client. + c.mux.Lock() + defer c.mux.Unlock() + + if c.c != nil { + return nil + } + + // Detect Elasticsearch server version using Info API. + // Info API is backward compatible across v5, v6 and v7. + esv6, err := v6.New(c.host, "") + if err != nil { + return err + } + + res, err := esv6.Client.Info( + esv6.Client.Info.WithContext(context.Background()), + ) + if err != nil { + return err + } + + defer func() { + _ = res.Body.Close() + }() + + var b map[string]interface{} + if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil { + return err + } + if res.IsError() { + // Print the response status and error information. + e, _ := b["error"].(map[string]interface{}) + return fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"]) + } + + // get the major version + version, _ := b["version"].(map[string]interface{}) + number, _ := version["number"].(string) + if number == "" { + return fmt.Errorf("failed to detect elastic version number") + } + + var vc versions.Client + v := strings.Split(number, ".")[0] + switch v { + case ElasticV5: + vc, err = v5.New(c.host, c.index) + case ElasticV6: + vc, err = v6.New(c.host, c.index) + case ElasticV7: + vc, err = v7.New(c.host, c.index) + default: + err = fmt.Errorf("unsupported elasticsearch version %s", version) + } + + if err != nil { + return err + } + + c.c = vc + c.version = v + return nil +} + +func (c *Client) Search(builder *query.Builder, startTime, endTime time.Time, scroll bool) (*Response, error) { + + err := c.loadClient() + if err != nil { + return nil, err + } + + // Initial Search + body, err := builder.Bytes() + if err != nil { + return nil, err + } + + res, err := c.c.Search(esutil.ResolveIndexNames(c.index, startTime, endTime), body, scroll) + if err != nil { + return nil, err + } + + return parseResponse(res) +} + +func (c *Client) Scroll(id string) (*Response, error) { + + err := c.loadClient() + if err != nil { + return nil, err + } + + res, err := c.c.Scroll(id) + if err != nil { + return nil, err + } + + return parseResponse(res) +} + +func (c *Client) ClearScroll(id string) { + if id != "" { + c.c.ClearScroll(id) + } +} + +func (c *Client) GetTotalHitCount(v interface{}) int64 { + return c.c.GetTotalHitCount(v) +} diff --git a/pkg/simple/client/es/client_test.go b/pkg/simple/client/es/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..67534ea6e40e090532347cef3227d33cc558ed99 --- /dev/null +++ b/pkg/simple/client/es/client_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2020 KubeSphere Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package es + +import ( + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/json-iterator/go" + "io/ioutil" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewClient(t *testing.T) { + var tests = []struct { + fakeResp string + expected string + }{ + { + fakeResp: "es6_detect_version_major_200.json", + expected: ElasticV6, + }, + { + fakeResp: "es7_detect_version_major_200.json", + expected: ElasticV7, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + es := mockElasticsearchService("/", test.fakeResp, http.StatusOK) + defer es.Close() + + client := &Client{host: es.URL} + err := client.loadClient() + if err != nil { + t.Fatal(err) + } + + if diff := cmp.Diff(client.version, test.expected); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) + } + }) + } +} + +func TestClient_Search(t *testing.T) { + var tests = []struct { + fakeVersion string + fakeResp string + fakeCode int + expected string + expectedErr string + }{ + { + fakeVersion: ElasticV7, + fakeResp: "es7_search_200.json", + fakeCode: http.StatusOK, + expected: "es7_search_200_result.json", + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + var expected Response + err := JsonFromFile(test.expected, &expected) + if err != nil { + t.Fatal(err) + } + + srv := mockElasticsearchService("/ks-logstash*/_search", test.fakeResp, test.fakeCode) + defer srv.Close() + + c, err := NewClient(srv.URL, "ks-logstash", test.fakeVersion) + if err != nil { + t.Fatalf("create client error, %s", err) + } + result, err := c.Search(query.NewBuilder(), time.Time{}, time.Now(), false) + if test.expectedErr != "" { + if diff := cmp.Diff(fmt.Sprint(err), test.expectedErr); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", test.expectedErr, diff) + } + } + if diff := cmp.Diff(result, &expected); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", expected, diff) + } + }) + } +} + +func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) { + b, _ := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", fakeResp)) + res.WriteHeader(fakeCode) + res.Write(b) + }) + return httptest.NewServer(mux) +} + +func JsonFromFile(expectedFile string, expectedJsonPtr interface{}) error { + json, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", expectedFile)) + if err != nil { + return err + } + err = jsoniter.Unmarshal(json, expectedJsonPtr) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/simple/client/es/query/builder.go b/pkg/simple/client/es/query/builder.go new file mode 100644 index 0000000000000000000000000000000000000000..ec40a6077070b31f20321aae595e8234d0d93381 --- /dev/null +++ b/pkg/simple/client/es/query/builder.go @@ -0,0 +1,491 @@ +package query + +import ( + "github.com/json-iterator/go" + "reflect" +) + +// TODO: elastic/go-elasticsearch is working on Query DSL support. +// See https://github.com/elastic/go-elasticsearch/issues/42. +// We need refactor our query body builder when that is ready. +type Builder struct { + From int64 `json:"from,omitempty"` + Size int64 `json:"size,omitempty"` + Sorts []map[string]string `json:"sort,omitempty"` + *Query `json:",inline"` + *Aggregations `json:"aggs,omitempty"` +} + +func NewBuilder() *Builder { + return &Builder{} +} + +func (b *Builder) Bytes() ([]byte, error) { + return jsoniter.Marshal(b) +} + +func (b *Builder) WithQuery(q *Query) *Builder { + + if q == nil || q.Bool == nil || !q.IsValid() { + return b + } + + b.Query = q + return b +} + +func (b *Builder) WithAggregations(aggs *Aggregations) *Builder { + + b.Aggregations = aggs + return b +} + +func (b *Builder) WithFrom(n int64) *Builder { + b.From = n + return b +} + +func (b *Builder) WithSize(n int64) *Builder { + b.Size = n + return b +} + +func (b *Builder) WithSort(key, order string) *Builder { + if order == "" { + order = "desc" + } + b.Sorts = []map[string]string{{key: order}} + return b +} + +// Query + +type Query struct { + *Bool `json:"query,omitempty"` +} + +func NewQuery() *Query { + return &Query{} +} + +func (q *Query) WithBool(b *Bool) *Query { + if b == nil || !b.IsValid() { + return q + } + + q.Bool = b + return q +} + +// Aggregations + +type Aggregations struct { + *CardinalityAggregation `json:"cardinality_aggregation,omitempty"` + *DateHistogramAggregation `json:"date_histogram_aggregation,omitempty"` +} + +type CardinalityAggregation struct { + *Cardinality `json:"cardinality,omitempty"` +} + +type Cardinality struct { + Field string `json:"field,omitempty"` +} + +type DateHistogramAggregation struct { + *DateHistogram `json:"date_histogram,omitempty"` +} + +type DateHistogram struct { + Field string `json:"field,omitempty"` + Interval string `json:"interval,omitempty"` +} + +func NewAggregations() *Aggregations { + return &Aggregations{} +} + +func (a *Aggregations) WithCardinalityAggregation(field string) *Aggregations { + + a.CardinalityAggregation = &CardinalityAggregation{ + &Cardinality{ + Field: field, + }, + } + + return a +} + +func (a *Aggregations) WithDateHistogramAggregation(field string, interval string) *Aggregations { + + a.DateHistogramAggregation = &DateHistogramAggregation{ + &DateHistogram{ + Field: field, + Interval: interval, + }, + } + + return a +} + +type Item interface { + IsValid() bool +} + +// Example: +// {bool: {filter: <[]Match>}} +// {bool: {should: <[]Match>, minimum_should_match: 1}} +type Bool struct { + *Parameter `json:"bool,omitempty"` +} + +type Parameter struct { + Filter []interface{} `json:"filter,omitempty"` + Should []interface{} `json:"should,omitempty"` + MustNot []interface{} `json:"must_not,omitempty"` + MinimumShouldMatch int32 `json:"minimum_should_match,omitempty"` +} + +func NewBool() *Bool { + return &Bool{ + &Parameter{}, + } +} + +func (b *Bool) IsValid() bool { + if (b.Filter == nil || len(b.Filter) == 0) && + (b.Should == nil || len(b.Should) == 0) && + (b.MustNot == nil || len(b.MustNot) == 0) { + return false + } + + return true +} + +func (b *Bool) AppendFilter(item Item) *Bool { + + if reflect.ValueOf(item).IsNil() || !item.IsValid() { + return b + } + + b.Filter = append(b.Filter, item) + return b +} + +func (b *Bool) AppendMultiFilter(items []Item) *Bool { + + if items == nil || len(items) == 0 { + return b + } + + for _, item := range items { + if item.IsValid() { + b.Filter = append(b.Filter, item) + } + } + + return b +} + +func (b *Bool) AppendShould(item Item) *Bool { + + if reflect.ValueOf(item).IsNil() || !item.IsValid() { + return b + } + + b.Should = append(b.Should, item) + return b +} + +func (b *Bool) AppendMultiShould(items []Item) *Bool { + + if items == nil || len(items) == 0 { + return b + } + + for _, item := range items { + if item.IsValid() { + b.Should = append(b.Should, item) + } + } + return b +} + +func (b *Bool) AppendMustNot(item Item) *Bool { + + if reflect.ValueOf(item).IsNil() || !item.IsValid() { + return b + } + + b.MustNot = append(b.MustNot, item) + return b +} + +func (b *Bool) AppendMultiMustNot(items []Item) *Bool { + + if items == nil || len(items) == 0 { + return b + } + + for _, item := range items { + if item.IsValid() { + b.MustNot = append(b.MustNot, item) + } + } + return b +} + +func (b *Bool) WithMinimumShouldMatch(min int32) *Bool { + + b.MinimumShouldMatch = min + return b +} + +type MatchPhrase struct { + MatchPhrase map[string]string `json:"match_phrase,omitempty"` +} + +func (m *MatchPhrase) IsValid() bool { + + if m.MatchPhrase == nil || len(m.MatchPhrase) == 0 { + return false + } + + return true +} + +func NewMatchPhrase(key, val string) *MatchPhrase { + return &MatchPhrase{ + MatchPhrase: map[string]string{ + key: val, + }, + } +} + +func NewMultiMatchPhrase(key string, val []string) []Item { + + var array []Item + + if val == nil || len(val) == 0 { + return nil + } + + for _, v := range val { + array = append(array, &MatchPhrase{ + MatchPhrase: map[string]string{ + key: v, + }, + }) + } + + return array +} + +type MatchPhrasePrefix struct { + MatchPhrasePrefix map[string]string `json:"match_phrase_prefix,omitempty"` +} + +func (m *MatchPhrasePrefix) IsValid() bool { + + if m.MatchPhrasePrefix == nil || len(m.MatchPhrasePrefix) == 0 { + return false + } + + return true +} + +func NewMatchPhrasePrefix(key, val string) *MatchPhrasePrefix { + return &MatchPhrasePrefix{ + MatchPhrasePrefix: map[string]string{ + key: val, + }, + } +} + +func NewMultiMatchPhrasePrefix(key string, val []string) []Item { + + var array []Item + + if val == nil || len(val) == 0 { + return nil + } + + for _, v := range val { + array = append(array, &MatchPhrasePrefix{ + MatchPhrasePrefix: map[string]string{ + key: v, + }, + }) + } + + return array +} + +type Regexp struct { + Regexp map[string]string `json:"regexp,omitempty"` +} + +func (m *Regexp) IsValid() bool { + + if m.Regexp == nil || len(m.Regexp) == 0 { + return false + } + + return true +} + +func NewRegexp(key, val string) *Regexp { + return &Regexp{ + Regexp: map[string]string{ + key: val, + }, + } +} + +type Range struct { + Range map[string]map[string]interface{} `json:"range,omitempty"` +} + +func NewRange(key string) *Range { + return &Range{ + Range: map[string]map[string]interface{}{ + key: make(map[string]interface{}), + }, + } +} + +func (r *Range) WithGT(val interface{}) *Range { + r.withRange("gt", val) + return r +} + +func (r *Range) WithGTE(val interface{}) *Range { + r.withRange("gte", val) + return r +} + +func (r *Range) WithLT(val interface{}) *Range { + r.withRange("lt", val) + return r +} + +func (r *Range) WithLTE(val interface{}) *Range { + r.withRange("lte", val) + return r +} + +func (r *Range) IsValid() bool { + if r.Range == nil { + return false + } + + if len(r.Range) == 0 { + return false + } + + for _, v := range r.Range { + if len(v) != 0 { + return true + } + } + + return false +} + +func (r *Range) withRange(operator string, val interface{}) { + if r.Range == nil { + return + } + + for _, v := range r.Range { + v[operator] = val + } +} + +type Wildcard struct { + Wildcard map[string]string `json:"wildcard,omitempty"` +} + +func (m *Wildcard) IsValid() bool { + + if m.Wildcard == nil || len(m.Wildcard) == 0 { + return false + } + + return true +} + +func NewWildcard(key, val string) *Wildcard { + + return &Wildcard{ + Wildcard: map[string]string{ + key: val, + }, + } +} + +func NewMultiWildcard(key string, val []string) []Item { + + var array []Item + + if val == nil || len(val) == 0 { + return nil + } + + for _, v := range val { + array = append(array, &Wildcard{ + Wildcard: map[string]string{ + key: v, + }, + }) + } + + return array +} + +type Terms struct { + Terms map[string]interface{} `json:"terms,omitempty"` +} + +func (m *Terms) IsValid() bool { + + if m.Terms == nil || len(m.Terms) == 0 { + return false + } + + return true +} + +func NewTerms(key string, val interface{}) *Terms { + + if reflect.ValueOf(val).IsNil() { + return nil + } + + return &Terms{ + Terms: map[string]interface{}{ + key: val, + }, + } +} + +type Exists struct { + Exists map[string]string `json:"exists,omitempty"` +} + +func (m *Exists) IsValid() bool { + + if m.Exists == nil || len(m.Exists) == 0 { + return false + } + + return true +} + +func NewExists(key, val string) *Exists { + return &Exists{ + Exists: map[string]string{ + key: val, + }, + } +} diff --git a/pkg/simple/client/es/response.go b/pkg/simple/client/es/response.go new file mode 100644 index 0000000000000000000000000000000000000000..75674703feccc572a0c6cffeb0e8834b60962d8f --- /dev/null +++ b/pkg/simple/client/es/response.go @@ -0,0 +1,50 @@ +package es + +import ( + "github.com/json-iterator/go" + "k8s.io/klog" +) + +type Response struct { + ScrollId string `json:"_scroll_id,omitempty"` + Hits `json:"hits,omitempty"` + *Aggregations `json:"aggregations,omitempty"` +} + +type Hits struct { + Total interface{} `json:"total,omitempty"` // `As of Elasticsearch v7.x, hits.total is changed incompatibly + AllHits []Hit `json:"hits,omitempty"` +} + +type Hit struct { + Source interface{} `json:"_source,omitempty"` + Sort []int64 `json:"sort,omitempty"` +} + +type Aggregations struct { + *CardinalityAggregation `json:"cardinality_aggregation,omitempty"` + *DateHistogramAggregation `json:"date_histogram_aggregation,omitempty"` +} + +type CardinalityAggregation struct { + Value int64 `json:"value,omitempty"` +} + +type DateHistogramAggregation struct { + Buckets []Bucket `json:"buckets,omitempty"` +} + +type Bucket struct { + Key int64 `json:"key,omitempty"` + Count int64 `json:"doc_count,omitempty"` +} + +func parseResponse(body []byte) (*Response, error) { + var res Response + err := jsoniter.Unmarshal(body, &res) + if err != nil { + klog.Error(err) + return nil, err + } + return &res, nil +} diff --git a/pkg/simple/client/logging/elasticsearch/testdata/es6_detect_version_major_200.json b/pkg/simple/client/es/testdata/es6_detect_version_major_200.json similarity index 99% rename from pkg/simple/client/logging/elasticsearch/testdata/es6_detect_version_major_200.json rename to pkg/simple/client/es/testdata/es6_detect_version_major_200.json index f0ffed60480d66fcb8e0d47ad6653e43af029869..a808e3c4c7cf7f35f68f9f71432d329c111584e9 100644 --- a/pkg/simple/client/logging/elasticsearch/testdata/es6_detect_version_major_200.json +++ b/pkg/simple/client/es/testdata/es6_detect_version_major_200.json @@ -14,4 +14,4 @@ "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" -} \ No newline at end of file +} diff --git a/pkg/simple/client/logging/elasticsearch/testdata/es7_detect_version_major_200.json b/pkg/simple/client/es/testdata/es7_detect_version_major_200.json similarity index 99% rename from pkg/simple/client/logging/elasticsearch/testdata/es7_detect_version_major_200.json rename to pkg/simple/client/es/testdata/es7_detect_version_major_200.json index b2fc43429a0b3353b10b4e4501950bb8bfdd71b4..f22f99c6807b6681e8fc1f2467f76ff0bcfa7a7b 100644 --- a/pkg/simple/client/logging/elasticsearch/testdata/es7_detect_version_major_200.json +++ b/pkg/simple/client/es/testdata/es7_detect_version_major_200.json @@ -14,4 +14,4 @@ "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" -} \ No newline at end of file +} diff --git a/pkg/simple/client/es/testdata/es7_search_200.json b/pkg/simple/client/es/testdata/es7_search_200.json new file mode 100644 index 0000000000000000000000000000000000000000..c9b4c6c4fcd38372d33a78cd1b9ab41f494fcaa5 --- /dev/null +++ b/pkg/simple/client/es/testdata/es7_search_200.json @@ -0,0 +1,76 @@ +{ + "took": 772, + "timed_out": false, + "_shards": { + "total": 2, + "successful": 2, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 10000, + "relation": "gte" + }, + "max_score": 1.0, + "hits": [ + { + "_index": "ks-logstash-log-2020.05.16", + "_type": "flb_type", + "_id": "tRt2MXIBlcWZ594bqIUO", + "_score": 1.0, + "_source": { + "@timestamp": "2020-05-16T16:00:42.608Z", + "log": "10.233.30.76 redis-ha-announce-0.kubesphere-system.svc.cluster.local\n", + "time": "2020-05-16T16:00:42.608962452Z", + "kubernetes": { + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "kubesphere-system", + "host": "master0", + "container_name": "config-init", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0", + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336" + } + } + }, + { + "_index": "ks-logstash-log-2020.05.16", + "_type": "flb_type", + "_id": "tht2MXIBlcWZ594bqIUO", + "_score": 1.0, + "_source": { + "@timestamp": "2020-05-16T16:00:42.670Z", + "log": "10.233.30.204 redis-ha-announce-1.kubesphere-system.svc.cluster.local\n", + "time": "2020-05-16T16:00:42.670430525Z", + "kubernetes": { + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "kubesphere-system", + "host": "master0", + "container_name": "config-init", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0", + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336" + } + } + }, + { + "_index": "ks-logstash-log-2020.05.16", + "_type": "flb_type", + "_id": "txt2MXIBlcWZ594bqIUO", + "_score": 1.0, + "_source": { + "@timestamp": "2020-05-16T16:00:42.731Z", + "log": "scvg14005: inuse: 16, idle: 42, sys: 58, released: 40, consumed: 17 (MB)\n", + "time": "2020-05-16T16:00:42.731865428Z", + "kubernetes": { + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "istio-system", + "host": "node0", + "container_name": "mixer", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0", + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336" + } + } + } + ] + } +} diff --git a/pkg/simple/client/es/testdata/es7_search_200_result.json b/pkg/simple/client/es/testdata/es7_search_200_result.json new file mode 100644 index 0000000000000000000000000000000000000000..aa89d9bbf049e1cf989eab6e922e5ae392d0d85d --- /dev/null +++ b/pkg/simple/client/es/testdata/es7_search_200_result.json @@ -0,0 +1,55 @@ +{ + "hits": { + "total": { + "value": 10000, + "relation": "gte" + }, + "hits": [ + { + "_source": { + "time": "2020-05-16T16:00:42.608962452Z", + "kubernetes": { + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336", + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "kubesphere-system", + "host": "master0", + "container_name": "config-init", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0" + }, + "@timestamp": "2020-05-16T16:00:42.608Z", + "log": "10.233.30.76 redis-ha-announce-0.kubesphere-system.svc.cluster.local\n" + } + }, + { + "_source": { + "@timestamp": "2020-05-16T16:00:42.670Z", + "log": "10.233.30.204 redis-ha-announce-1.kubesphere-system.svc.cluster.local\n", + "time": "2020-05-16T16:00:42.670430525Z", + "kubernetes": { + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "kubesphere-system", + "host": "master0", + "container_name": "config-init", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0", + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336" + } + } + }, + { + "_source": { + "@timestamp": "2020-05-16T16:00:42.731Z", + "log": "scvg14005: inuse: 16, idle: 42, sys: 58, released: 40, consumed: 17 (MB)\n", + "time": "2020-05-16T16:00:42.731865428Z", + "kubernetes": { + "container_hash": "cd4b3d4d27ae5931dc96b9632188590b7a6880469bcf07f478a3280dd0955336", + "pod_name": "redis-ha-haproxy-ffb8d889d-8x9kj", + "namespace_name": "istio-system", + "host": "node0", + "container_name": "mixer", + "docker_id": "a673327e5e3dfefca3e773273e69eca64baaa4499fdc04e6eb9d621ad8688ad0" + } + } + } + ] + } +} diff --git a/pkg/simple/client/es/versions/interface.go b/pkg/simple/client/es/versions/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..f12263cd978cdefce5b065e878217319dc656603 --- /dev/null +++ b/pkg/simple/client/es/versions/interface.go @@ -0,0 +1,9 @@ +package versions + +// versioned es client interface +type Client interface { + Search(indices string, body []byte, scroll bool) ([]byte, error) + Scroll(id string) ([]byte, error) + ClearScroll(id string) + GetTotalHitCount(v interface{}) int64 +} diff --git a/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go b/pkg/simple/client/es/versions/v5/v5.go similarity index 100% rename from pkg/simple/client/logging/elasticsearch/versions/v5/v5.go rename to pkg/simple/client/es/versions/v5/v5.go diff --git a/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go b/pkg/simple/client/es/versions/v6/v6.go similarity index 100% rename from pkg/simple/client/logging/elasticsearch/versions/v6/v6.go rename to pkg/simple/client/es/versions/v6/v6.go diff --git a/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go b/pkg/simple/client/es/versions/v7/v7.go similarity index 100% rename from pkg/simple/client/logging/elasticsearch/versions/v7/v7.go rename to pkg/simple/client/es/versions/v7/v7.go diff --git a/pkg/simple/client/events/elasticsearch/clients.go b/pkg/simple/client/events/elasticsearch/clients.go deleted file mode 100644 index ac5d2bc2685eddde654af91b65e4cdc21f2e655d..0000000000000000000000000000000000000000 --- a/pkg/simple/client/events/elasticsearch/clients.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright 2020 KubeSphere Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package elasticsearch - -import ( - "fmt" - es5 "github.com/elastic/go-elasticsearch/v5" - es5api "github.com/elastic/go-elasticsearch/v5/esapi" - es6 "github.com/elastic/go-elasticsearch/v6" - es6api "github.com/elastic/go-elasticsearch/v6/esapi" - es7 "github.com/elastic/go-elasticsearch/v7" - es7api "github.com/elastic/go-elasticsearch/v7/esapi" - jsoniter "github.com/json-iterator/go" - "io" - "net/http" -) - -type Request struct { - Index string - Body io.Reader -} - -type Response struct { - Hits Hits `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` -} - -type Hits struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` -} - -type Error struct { - Type string `json:"type"` - Reason string `json:"reason"` - Status int `json:"status"` -} - -func (e Error) Error() string { - return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason) -} - -type ClientV5 es5.Client - -func (c *ClientV5) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), - c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer resp.Body.Close() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} -func (c *ClientV5) Version() (string, error) { - res, err := c.Info() - if err != nil { - return "", err - } - defer res.Body.Close() - if res.IsError() { - return "", fmt.Errorf(res.String()) - } - var r map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return "", fmt.Errorf("error parsing the response body: %s", err) - } - return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil -} - -type ClientV6 es6.Client - -func (c *ClientV6) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), - c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer resp.Body.Close() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits struct { - Total int64 `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} - -type ClientV7 es7.Client - -func (c *ClientV7) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), - c.Search.WithIgnoreUnavailable(true))) -} -func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) { - if err != nil { - return nil, fmt.Errorf("error getting response: %s", err) - } - defer resp.Body.Close() - if resp.IsError() { - return nil, fmt.Errorf(resp.String()) - } - var r struct { - Hits struct { - Total struct { - Value int64 `json:"value"` - } `json:"total"` - Hits jsoniter.RawMessage `json:"hits"` - } `json:"hits"` - Aggregations map[string]jsoniter.RawMessage `json:"aggregations"` - } - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, fmt.Errorf("error parsing the response body: %s", err) - } - return &Response{ - Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits}, - Aggregations: r.Aggregations, - }, nil -} - -type client interface { - ExSearch(r *Request) (*Response, error) -} diff --git a/pkg/simple/client/events/elasticsearch/elasticsearch.go b/pkg/simple/client/events/elasticsearch/elasticsearch.go index 91ad3c7236d388753ddf73cb5c80f42fd46728bf..b45ca151a4c07c38bc8cd2758a078b14c2646ed8 100644 --- a/pkg/simple/client/events/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/events/elasticsearch/elasticsearch.go @@ -17,429 +17,159 @@ limitations under the License. package elasticsearch import ( - "bytes" "fmt" - "strings" - "sync" - "time" - - es5 "github.com/elastic/go-elasticsearch/v5" - es6 "github.com/elastic/go-elasticsearch/v6" - es7 "github.com/elastic/go-elasticsearch/v7" - jsoniter "github.com/json-iterator/go" - corev1 "k8s.io/api/core/v1" + "kubesphere.io/kubesphere/pkg/simple/client/es" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" "kubesphere.io/kubesphere/pkg/simple/client/events" - "kubesphere.io/kubesphere/pkg/utils/esutil" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary +type client struct { + c *es.Client +} -type elasticsearch struct { - c client - opts struct { - indexPrefix string - } +func NewClient(options *events.Options) (events.Client, error) { + c := &client{} + + var err error + c.c, err = es.NewClient(options.Host, options.IndexPrefix, options.Version) + return c, err } -func (es *elasticsearch) SearchEvents(filter *events.Filter, from, size int64, +func (c *client) SearchEvents(filter *events.Filter, from, size int64, sort string) (*events.Events, error) { - queryPart := parseToQueryPart(filter) - if sort == "" { - sort = "desc" - } - sortPart := []map[string]interface{}{{ - "lastTimestamp": map[string]string{"order": sort}, - }} - b := map[string]interface{}{ - "from": from, - "size": size, - "query": queryPart, - "sort": sortPart, - } - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithSort("lastTimestamp", sort). + WithFrom(from). + WithSize(size) + + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil { return nil, err } - if resp == nil || len(resp.Hits.Hits) == 0 { + if resp == nil || len(resp.AllHits) == 0 { return &events.Events{}, nil } - var innerHits []struct { - *corev1.Event `json:"_source"` - } - if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil { - return nil, err - } - evts := events.Events{Total: resp.Hits.Total} - for _, hit := range innerHits { - evts.Records = append(evts.Records, hit.Event) + evts := events.Events{Total: c.c.GetTotalHitCount(resp.Total)} + for _, hit := range resp.AllHits { + evts.Records = append(evts.Records, hit.Source) } return &evts, nil } -func (es *elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { +func (c *client) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { if interval == "" { interval = "15m" } - queryPart := parseToQueryPart(filter) - aggName := "events_count_over_lasttimestamp" - aggsPart := map[string]interface{}{ - aggName: map[string]interface{}{ - "date_histogram": map[string]string{ - "field": "lastTimestamp", - "interval": interval, - }, - }, - } - b := map[string]interface{}{ - "query": queryPart, - "aggs": aggsPart, - "size": 0, // do not get docs - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithAggregations(query.NewAggregations(). + WithDateHistogramAggregation("lastTimestamp", interval)). + WithSize(0) - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil { return nil, err } - if resp == nil || resp.Aggregations == nil { + if resp == nil || resp.Aggregations.DateHistogramAggregation == nil { return &events.Histogram{}, nil } - raw, ok := resp.Aggregations[aggName] - if !ok || len(raw) == 0 { - return &events.Histogram{}, nil - } - var agg struct { - Buckets []struct { - KeyAsString string `json:"key_as_string"` - Key int64 `json:"key"` - DocCount int64 `json:"doc_count"` - } `json:"buckets"` - } - if err := json.Unmarshal(raw, &agg); err != nil { - return nil, err - } - histo := events.Histogram{Total: resp.Hits.Total} - for _, b := range agg.Buckets { + histo := events.Histogram{Total: c.c.GetTotalHitCount(resp.Total)} + for _, bucket := range resp.Buckets { histo.Buckets = append(histo.Buckets, - events.Bucket{Time: b.Key, Count: b.DocCount}) + events.Bucket{Time: bucket.Key, Count: bucket.Count}) } return &histo, nil } -func (es *elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { - queryPart := parseToQueryPart(filter) - aggName := "resources_count" - aggsPart := map[string]interface{}{ - aggName: map[string]interface{}{ - "cardinality": map[string]string{ - "field": "involvedObject.uid.keyword", - }, - }, - } - b := map[string]interface{}{ - "query": queryPart, - "aggs": aggsPart, - "size": 0, // do not get docs - } +func (c *client) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { - body, err := json.Marshal(b) - if err != nil { - return nil, err - } - resp, err := es.c.ExSearch(&Request{ - Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), - Body: bytes.NewBuffer(body), - }) + b := query.NewBuilder(). + WithQuery(parseToQueryPart(filter)). + WithAggregations(query.NewAggregations(). + WithCardinalityAggregation("involvedObject.uid.keyword")). + WithSize(0) + + resp, err := c.c.Search(b, filter.StartTime, filter.EndTime, false) if err != nil { return nil, err } - if resp == nil || resp.Aggregations == nil { - return &events.Statistics{}, nil - } - - raw, ok := resp.Aggregations[aggName] - if !ok || len(raw) == 0 { + if resp == nil || resp.Aggregations.CardinalityAggregation == nil { return &events.Statistics{}, nil } - var agg struct { - Value int64 `json:"value"` - } - if err := json.Unmarshal(raw, &agg); err != nil { - return nil, err - } return &events.Statistics{ - Resources: agg.Value, - Events: resp.Hits.Total, + Resources: resp.Value, + Events: c.c.GetTotalHitCount(resp.Total), }, nil } -func newClient(options *Options) (*elasticsearch, error) { - clientV5 := func() (*ClientV5, error) { - c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}}) - if err != nil { - return nil, err - } - return (*ClientV5)(c), nil - } - clientV6 := func() (*ClientV6, error) { - c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}}) - if err != nil { - return nil, err - } - return (*ClientV6)(c), nil - } - clientV7 := func() (*ClientV7, error) { - c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}}) - if err != nil { - return nil, err - } - return (*ClientV7)(c), nil +func parseToQueryPart(f *events.Filter) *query.Query { + if f == nil { + return nil } - var ( - version = options.Version - es = elasticsearch{} - err error - ) - es.opts.indexPrefix = options.IndexPrefix - - if options.Version == "" { - var c5 *ClientV5 - if c5, err = clientV5(); err == nil { - if version, err = c5.Version(); err == nil { - es.c = c5 - } - } - } - if err != nil { - return nil, err - } + var mini int32 = 1 + b := query.NewBool() - switch strings.Split(version, ".")[0] { - case "5": - if es.c == nil { - es.c, err = clientV5() + bi := query.NewBool().WithMinimumShouldMatch(mini) + for k, v := range f.InvolvedObjectNamespaceMap { + if k == "" { + bi.AppendShould(query.NewBool(). + AppendMustNot(query.NewExists("field", "involvedObject.namespace"))) + } else { + bi.AppendShould(query.NewBool(). + AppendFilter(query.NewMatchPhrase("involvedObject.namespace.keyword", k)). + AppendFilter(query.NewRange("lastTimestamp"). + WithGTE(v))) } - case "6": - es.c, err = clientV6() - case "7": - es.c, err = clientV7() - default: - err = fmt.Errorf("unsupported elasticsearch version %s", version) - } - if err != nil { - return nil, err } - return &es, nil -} - -type Elasticsearch struct { - innerEs *elasticsearch - options Options - mutex sync.Mutex -} - -func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64, - sort string) (*events.Events, error) { - ies, e := es.getInnerEs() - if e != nil { - return nil, e - } - return ies.SearchEvents(filter, from, size, sort) -} - -func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { - ies, e := es.getInnerEs() - if e != nil { - return nil, e - } - return ies.CountOverTime(filter, interval) -} + b.AppendFilter(bi) -func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { - ies, e := es.getInnerEs() - if e != nil { - return nil, e - } - return ies.StatisticsOnResources(filter) -} + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("involvedObject.name.keyword", f.InvolvedObjectNames)). + WithMinimumShouldMatch(mini)) -func (es *Elasticsearch) getInnerEs() (*elasticsearch, error) { - if es.innerEs != nil { - return es.innerEs, nil - } - es.mutex.Lock() - defer es.mutex.Unlock() - if es.innerEs != nil { - return es.innerEs, nil - } - ies, err := newClient(&es.options) - if err != nil { - return nil, err - } - es.innerEs = ies - return es.innerEs, nil -} + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("involvedObject.name", f.InvolvedObjectNameFuzzy)). + WithMinimumShouldMatch(mini)) -func NewClient(options *Options) (*Elasticsearch, error) { - return &Elasticsearch{options: *options}, nil -} + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("involvedObject.kind", f.InvolvedObjectkinds)). + WithMinimumShouldMatch(mini)) -func parseToQueryPart(f *events.Filter) interface{} { - if f == nil { - return nil - } - type BoolBody struct { - Filter []map[string]interface{} `json:"filter,omitempty"` - Should []map[string]interface{} `json:"should,omitempty"` - MinimumShouldMatch *int `json:"minimum_should_match,omitempty"` - MustNot []map[string]interface{} `json:"must_not,omitempty"` - } - var mini = 1 - b := BoolBody{} - queryBody := map[string]interface{}{ - "bool": &b, - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("reason", f.Reasons)). + WithMinimumShouldMatch(mini)) - if len(f.InvolvedObjectNamespaceMap) > 0 { - bi := BoolBody{MinimumShouldMatch: &mini} - for k, v := range f.InvolvedObjectNamespaceMap { - if k == "" { - bi.Should = append(bi.Should, map[string]interface{}{ - "bool": &BoolBody{ - MustNot: []map[string]interface{}{{ - "exists": map[string]string{"field": "involvedObject.namespace"}, - }}, - }, - }) - } else { - bi.Should = append(bi.Should, map[string]interface{}{ - "bool": &BoolBody{ - Filter: []map[string]interface{}{{ - "match_phrase": map[string]string{"involvedObject.namespace.keyword": k}, - }, { - "range": map[string]interface{}{ - "lastTimestamp": map[string]interface{}{ - "gte": v, - }, - }, - }}, - }, - }) - } - } - if len(bi.Should) > 0 { - b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi}) - } + bi = query.NewBool().WithMinimumShouldMatch(mini) + for _, r := range f.ReasonFuzzy { + bi.AppendShould(query.NewWildcard("reason.keyword", fmt.Sprintf("*"+r+"*"))) } + b.AppendFilter(bi) - shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody { - bi := BoolBody{MinimumShouldMatch: &mini} - for _, v := range fieldValues { - if fieldValueMutate != nil { - v = fieldValueMutate(v) - } - bi.Should = append(bi.Should, map[string]interface{}{ - mtype: map[string]string{fieldName: v}, - }) - } - if len(bi.Should) == 0 { - return nil - } - return &bi - } + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("message", f.MessageFuzzy)). + WithMinimumShouldMatch(mini)) - if len(f.InvolvedObjectNames) > 0 { - if bi := shouldBoolbody("match_phrase", "involvedObject.name.keyword", - f.InvolvedObjectNames, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.InvolvedObjectNameFuzzy) > 0 { - if bi := shouldBoolbody("match_phrase_prefix", "involvedObject.name", - f.InvolvedObjectNameFuzzy, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.InvolvedObjectkinds) > 0 { - // involvedObject.kind is single word and here is not field keyword for case ignoring - if bi := shouldBoolbody("match_phrase", "involvedObject.kind", - f.InvolvedObjectkinds, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.Reasons) > 0 { - // reason is single word and here is not field keyword for case ignoring - if bi := shouldBoolbody("match_phrase", "reason", - f.Reasons, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.ReasonFuzzy) > 0 { - if bi := shouldBoolbody("wildcard", "reason", - f.ReasonFuzzy, func(s string) string { - return fmt.Sprintf("*" + s + "*") - }); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } - } - if len(f.MessageFuzzy) > 0 { - if bi := shouldBoolbody("match_phrase_prefix", "message", - f.MessageFuzzy, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + if f.Type != "" { + b.AppendFilter(query.NewBool(). + AppendShould(query.NewMatchPhrase("type", f.Type))) } - if len(f.Type) > 0 { - // type is single word and here is not field keyword for case ignoring - if bi := shouldBoolbody("match_phrase", "type", - []string{f.Type}, nil); bi != nil { - b.Filter = append(b.Filter, map[string]interface{}{"bool": bi}) - } + r := query.NewRange("lastTimestamp") + if !f.StartTime.IsZero() { + r.WithGTE(f.StartTime) } - - if f.StartTime != nil || f.EndTime != nil { - m := make(map[string]*time.Time) - if f.StartTime != nil { - m["gte"] = f.StartTime - } - if f.EndTime != nil { - m["lte"] = f.EndTime - } - b.Filter = append(b.Filter, map[string]interface{}{ - "range": map[string]interface{}{"lastTimestamp": m}, - }) - + if !f.EndTime.IsZero() { + r.WithLTE(f.EndTime) } - return queryBody -} + b.AppendFilter(r) -func resolveIndexNames(prefix string, start, end *time.Time) string { - var s, e time.Time - if start != nil { - s = *start - } - if end != nil { - e = *end - } - return esutil.ResolveIndexNames(prefix, s, e) + return query.NewQuery().WithBool(b) } diff --git a/pkg/simple/client/events/elasticsearch/elasticsearch_test.go b/pkg/simple/client/events/elasticsearch/elasticsearch_test.go index e09d790bb0bb7c0fa88ba3515a6cd8fc0e04afb2..fa59261bc876a9a9287172a02f4168b017cf5edf 100644 --- a/pkg/simple/client/events/elasticsearch/elasticsearch_test.go +++ b/pkg/simple/client/events/elasticsearch/elasticsearch_test.go @@ -17,12 +17,12 @@ limitations under the License. package elasticsearch import ( + "encoding/json" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "kubesphere.io/kubesphere/pkg/simple/client/events" "net/http" "net/http/httptest" - "strconv" "strings" "testing" "time" @@ -69,7 +69,7 @@ func TestStatisticsOnResources(t *testing.T) { ] }, "aggregations": { - "resources_count": { + "cardinality_aggregation": { "value": 100 } } @@ -116,18 +116,21 @@ func TestStatisticsOnResources(t *testing.T) { mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp) defer mes.Close() - es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"}) - + c, err := NewClient(&events.Options{ + Host: mes.URL, + IndexPrefix: "ks-logstash-events", + Version: test.fakeVersion, + }) if err != nil { - t.Fatal(err) + t.Fatalf("create client error, %s", err) } - stats, err := es.StatisticsOnResources(&test.filter) + stats, err := c.StatisticsOnResources(&test.filter) if test.expectedError { if err == nil { t.Fatalf("expected err like %s", test.fakeResp) - } else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) { + } else if !strings.Contains(err.Error(), "index_not_found_exception") { t.Fatalf("err does not contain expected code: %d", test.fakeCode) } } else { @@ -144,66 +147,68 @@ func TestStatisticsOnResources(t *testing.T) { func TestParseToQueryPart(t *testing.T) { q := ` { - "bool": { - "filter": [ - { - "bool": { - "should": [ - { - "bool": { - "filter": [ - { - "match_phrase": { - "involvedObject.namespace.keyword": "kubesphere-system" - } - }, - { - "range": { - "lastTimestamp": { - "gte": "2020-01-01T01:01:01.000000001Z" - } - } - } - ] - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase_prefix": { - "involvedObject.name": "istio" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "bool": { - "should": [ - { - "match_phrase": { - "reason": "unhealthy" - } - } - ], - "minimum_should_match": 1 - } - }, - { - "range": { - "lastTimestamp": { - "gte": "2019-12-01T01:01:01.000000001Z" - } - } - } - ] - } + "query":{ + "bool":{ + "filter":[ + { + "bool":{ + "should":[ + { + "bool":{ + "filter":[ + { + "match_phrase":{ + "involvedObject.namespace.keyword":"kubesphere-system" + } + }, + { + "range":{ + "lastTimestamp":{ + "gte":"2020-01-01T01:01:01.000000001Z" + } + } + } + ] + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase_prefix":{ + "involvedObject.name":"istio" + } + } + ], + "minimum_should_match":1 + } + }, + { + "bool":{ + "should":[ + { + "match_phrase":{ + "reason":"unhealthy" + } + } + ], + "minimum_should_match":1 + } + }, + { + "range":{ + "lastTimestamp":{ + "gte":"2019-12-01T01:01:01.000000001Z" + } + } + } + ] + } + } } ` nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC) @@ -215,7 +220,7 @@ func TestParseToQueryPart(t *testing.T) { }, InvolvedObjectNameFuzzy: []string{"istio"}, Reasons: []string{"unhealthy"}, - StartTime: &startTime, + StartTime: startTime, } qp := parseToQueryPart(filter) diff --git a/pkg/simple/client/events/interface.go b/pkg/simple/client/events/interface.go index ceecc0133ece87cb259f739fb40d9ebffac5f8dc..e5fd8ec4d88f56310109eb5200c54334e1ef5be7 100644 --- a/pkg/simple/client/events/interface.go +++ b/pkg/simple/client/events/interface.go @@ -17,7 +17,6 @@ limitations under the License. package events import ( - v1 "k8s.io/api/core/v1" "time" ) @@ -36,13 +35,13 @@ type Filter struct { ReasonFuzzy []string MessageFuzzy []string Type string - StartTime *time.Time - EndTime *time.Time + StartTime time.Time + EndTime time.Time } type Events struct { - Total int64 `json:"total" description:"total number of matched results"` - Records []*v1.Event `json:"records" description:"actual array of results"` + Total int64 `json:"total" description:"total number of matched results"` + Records []interface{} `json:"records" description:"actual array of results"` } type Histogram struct { diff --git a/pkg/simple/client/events/elasticsearch/options.go b/pkg/simple/client/events/options.go similarity index 85% rename from pkg/simple/client/events/elasticsearch/options.go rename to pkg/simple/client/events/options.go index f8485a4482ed3db8c20e32072d9d819dcf45ad36..66a958462af95b876ea91c4604945f234f1e0f96 100644 --- a/pkg/simple/client/events/elasticsearch/options.go +++ b/pkg/simple/client/events/options.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package elasticsearch +package events import ( "github.com/spf13/pflag" @@ -27,7 +27,7 @@ type Options struct { Version string `json:"version" yaml:"version"` } -func NewElasticSearchOptions() *Options { +func NewEventsOptions() *Options { return &Options{ Host: "", IndexPrefix: "ks-logstash-events", @@ -42,21 +42,21 @@ func (s *Options) ApplyTo(options *Options) { } func (s *Options) Validate() []error { - errs := []error{} + errs := make([]error, 0) return errs } func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { - fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+ + fs.StringVar(&s.Host, "events-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as event store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+ " the following elastic search options will be ignored.") - fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+ + fs.StringVar(&s.IndexPrefix, "events-index-prefix", c.IndexPrefix, ""+ "Index name prefix. KubeSphere will retrieve events against indices matching the prefix.") - fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+ + fs.StringVar(&s.Version, "events-elasticsearch-version", c.Version, ""+ "Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+ "Currently, minimum supported version is 5.x") } diff --git a/pkg/simple/client/logging/elasticsearch/api_body.go b/pkg/simple/client/logging/elasticsearch/api_body.go deleted file mode 100644 index 59850472a27c2d1f34beefb427efed6ff9b7b78a..0000000000000000000000000000000000000000 --- a/pkg/simple/client/logging/elasticsearch/api_body.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -Copyright 2020 KubeSphere Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package elasticsearch - -import ( - "fmt" - "github.com/json-iterator/go" - "k8s.io/klog" - "kubesphere.io/kubesphere/pkg/simple/client/logging" -) - -const ( - podNameMaxLength = 63 - podNameSuffixLength = 6 // 5 characters + 1 hyphen - replicaSetSuffixMaxLength = 11 // max 10 characters + 1 hyphen -) - -// TODO: elastic/go-elasticsearch is working on Query DSL support. -// See https://github.com/elastic/go-elasticsearch/issues/42. -// We need refactor our query body builder when that is ready. -type bodyBuilder struct { - Body -} - -func newBodyBuilder() *bodyBuilder { - return &bodyBuilder{} -} - -func (bb *bodyBuilder) bytes() ([]byte, error) { - return jsoniter.Marshal(bb.Body) -} - -func (bb *bodyBuilder) mainBool(sf logging.SearchFilter) *bodyBuilder { - var ms []Match - - // literal matching - if len(sf.NamespaceFilter) != 0 { - var b Bool - for ns := range sf.NamespaceFilter { - var match Match - if ct := sf.NamespaceFilter[ns]; ct != nil { - match = Match{ - Bool: &Bool{ - Filter: []Match{ - { - MatchPhrase: map[string]string{ - "kubernetes.namespace_name.keyword": ns, - }, - }, - { - Range: &Range{ - Time: &Time{ - Gte: ct, - }, - }, - }, - }, - }, - } - } else { - match = Match{ - Bool: &Bool{ - Filter: []Match{ - { - MatchPhrase: map[string]string{ - "kubernetes.namespace_name.keyword": ns, - }, - }, - }, - }, - } - } - b.Should = append(b.Should, match) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.WorkloadFilter != nil { - var b Bool - for _, wk := range sf.WorkloadFilter { - b.Should = append(b.Should, Match{Regexp: map[string]string{"kubernetes.pod_name.keyword": podNameRegexp(wk)}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.PodFilter != nil { - var b Bool - for _, po := range sf.PodFilter { - b.Should = append(b.Should, Match{MatchPhrase: map[string]string{"kubernetes.pod_name.keyword": po}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.ContainerFilter != nil { - var b Bool - for _, c := range sf.ContainerFilter { - b.Should = append(b.Should, Match{MatchPhrase: map[string]string{"kubernetes.container_name.keyword": c}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - - // fuzzy matching - if sf.WorkloadSearch != nil { - var b Bool - for _, wk := range sf.WorkloadSearch { - b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.pod_name": wk}}) - } - - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.PodSearch != nil { - var b Bool - for _, po := range sf.PodSearch { - b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.pod_name": po}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.ContainerSearch != nil { - var b Bool - for _, c := range sf.ContainerSearch { - b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.container_name": c}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - if sf.LogSearch != nil { - var b Bool - for _, l := range sf.LogSearch { - b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"log": l}}) - } - b.MinimumShouldMatch = 1 - ms = append(ms, Match{Bool: &b}) - } - - r := &Range{Time: &Time{}} - if !sf.Starttime.IsZero() { - r.Gte = &sf.Starttime - } - if !sf.Endtime.IsZero() { - r.Lte = &sf.Endtime - } - if r.Lte != nil || r.Gte != nil { - ms = append(ms, Match{Range: r}) - } - - bb.Body.Query = &Query{Bool{Filter: ms}} - return bb -} - -func (bb *bodyBuilder) cardinalityAggregation() *bodyBuilder { - bb.Body.Aggs = &Aggs{ - CardinalityAggregation: &CardinalityAggregation{ - &Cardinality{ - Field: "kubernetes.docker_id.keyword", - }, - }, - } - return bb -} - -func (bb *bodyBuilder) dateHistogramAggregation(interval string) *bodyBuilder { - bb.Body.Aggs = &Aggs{ - DateHistogramAggregation: &DateHistogramAggregation{ - &DateHistogram{ - Field: "time", - Interval: interval, - }, - }, - } - return bb -} - -func (bb *bodyBuilder) from(n int64) *bodyBuilder { - bb.From = n - return bb -} - -func (bb *bodyBuilder) size(n int64) *bodyBuilder { - bb.Size = n - return bb -} - -func (bb *bodyBuilder) sort(order string) *bodyBuilder { - bb.Sorts = []map[string]string{{"time": order}} - return bb -} - -func podNameRegexp(workloadName string) string { - var regexp string - if len(workloadName) <= podNameMaxLength-replicaSetSuffixMaxLength-podNameSuffixLength { - // match deployment pods, eg. <deploy>-579dfbcddd-24znw - // replicaset rand string is limited to vowels - // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83 - regexp += workloadName + "-[bcdfghjklmnpqrstvwxz2456789]{1,10}-[a-z0-9]{5}|" - // match statefulset pods, eg. <sts>-0 - regexp += workloadName + "-[0-9]+|" - // match pods of daemonset or job, eg. <ds>-29tdk, <job>-5xqvl - regexp += workloadName + "-[a-z0-9]{5}" - } else if len(workloadName) <= podNameMaxLength-podNameSuffixLength { - replicaSetSuffixLength := podNameMaxLength - podNameSuffixLength - len(workloadName) - regexp += fmt.Sprintf("%s%d%s", workloadName+"-[bcdfghjklmnpqrstvwxz2456789]{", replicaSetSuffixLength, "}[a-z0-9]{5}|") - regexp += workloadName + "-[0-9]+|" - regexp += workloadName + "-[a-z0-9]{5}" - } else { - // Rand suffix may overwrites the workload name if the name is too long - // This won't happen for StatefulSet because long name will cause ReplicaSet fails during StatefulSet creation. - regexp += workloadName[:podNameMaxLength-podNameSuffixLength+1] + "[a-z0-9]{5}|" - regexp += workloadName + "-[0-9]+" - } - return regexp -} - -func parseResponse(body []byte) (Response, error) { - var res Response - err := jsoniter.Unmarshal(body, &res) - if err != nil { - klog.Error(err) - return Response{}, err - } - return res, nil -} diff --git a/pkg/simple/client/logging/elasticsearch/api_body_test.go b/pkg/simple/client/logging/elasticsearch/api_body_test.go deleted file mode 100644 index c8281da692492ad0819134bc2903a35f89751229..0000000000000000000000000000000000000000 --- a/pkg/simple/client/logging/elasticsearch/api_body_test.go +++ /dev/null @@ -1,147 +0,0 @@ -/* -Copyright 2020 KubeSphere Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package elasticsearch - -import ( - "fmt" - "github.com/google/go-cmp/cmp" - "kubesphere.io/kubesphere/pkg/simple/client/logging" - "reflect" - "testing" - "time" -) - -func TestMainBool(t *testing.T) { - var tests = []struct { - filter logging.SearchFilter - expected string - }{ - { - filter: logging.SearchFilter{ - NamespaceFilter: map[string]*time.Time{ - "default": func() *time.Time { t := time.Unix(1589981934, 0); return &t }(), - }, - }, - expected: "api_body_1.json", - }, - { - filter: logging.SearchFilter{ - WorkloadFilter: []string{"mysql"}, - Starttime: time.Unix(1589980934, 0), - Endtime: time.Unix(1589981934, 0), - }, - expected: "api_body_2.json", - }, - { - filter: logging.SearchFilter{ - PodFilter: []string{"mysql"}, - PodSearch: []string{"mysql-a8w3s-10945j"}, - LogSearch: []string{"info"}, - }, - expected: "api_body_3.json", - }, - { - filter: logging.SearchFilter{ - ContainerFilter: []string{"mysql-1"}, - ContainerSearch: []string{"mysql-3"}, - }, - expected: "api_body_4.json", - }, - { - filter: logging.SearchFilter{ - Starttime: time.Unix(1590744676, 0), - }, - expected: "api_body_7.json", - }, - { - filter: logging.SearchFilter{ - NamespaceFilter: map[string]*time.Time{ - "default": nil, - }, - }, - expected: "api_body_8.json", - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - var expected Body - err := JsonFromFile(test.expected, &expected) - if err != nil { - t.Fatal(err) - } - - result := newBodyBuilder().mainBool(test.filter).Body - - if diff := cmp.Diff(result, expected); diff != "" { - fmt.Printf("%T differ (-got, +want): %s", expected, diff) - } - }) - } -} - -func TestCardinalityAggregation(t *testing.T) { - var tests = []struct { - expected string - }{ - { - expected: "api_body_5.json", - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - var expected Body - err := JsonFromFile(test.expected, &expected) - if err != nil { - t.Fatal(err) - } - - result := newBodyBuilder().cardinalityAggregation().Body - - if !reflect.DeepEqual(result, expected) { - t.Fatalf("expected: %v, but got %v", expected, result) - } - }) - } -} - -func TestDateHistogramAggregation(t *testing.T) { - var tests = []struct { - expected string - }{ - { - expected: "api_body_6.json", - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - var expected Body - err := JsonFromFile(test.expected, &expected) - if err != nil { - t.Fatal(err) - } - - result := newBodyBuilder().dateHistogramAggregation("15m").Body - - if !reflect.DeepEqual(result, expected) { - t.Fatalf("expected: %v, but got %v", expected, result) - } - }) - } -} diff --git a/pkg/simple/client/logging/elasticsearch/api_body_types.go b/pkg/simple/client/logging/elasticsearch/api_body_types.go deleted file mode 100644 index 3baca851b82e899ee8e6cb35babee17bfc1fa46e..0000000000000000000000000000000000000000 --- a/pkg/simple/client/logging/elasticsearch/api_body_types.go +++ /dev/null @@ -1,143 +0,0 @@ -/* -Copyright 2020 KubeSphere Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package elasticsearch - -import "time" - -// --------------------------------------------- Request Body --------------------------------------------- - -// More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html -type Body struct { - From int64 `json:"from,omitempty"` - Size int64 `json:"size,omitempty"` - Sorts []map[string]string `json:"sort,omitempty"` - *Query `json:"query,omitempty"` - *Aggs `json:"aggs,omitempty"` -} - -type Query struct { - Bool `json:"bool,omitempty"` -} - -// Example: -// {bool: {filter: <[]Match>}} -// {bool: {should: <[]Match>, minimum_should_match: 1}} -type Bool struct { - Filter []Match `json:"filter,omitempty"` - Should []Match `json:"should,omitempty"` - MinimumShouldMatch int32 `json:"minimum_should_match,omitempty"` -} - -// Example: []Match -// [ -// { -// bool: <Bool> -// }, -// { -// match_phrase: { -// <string>: <string> -// } -// }, -// ... -// ] -type Match struct { - *Bool `json:"bool,omitempty"` - MatchPhrase map[string]string `json:"match_phrase,omitempty"` - MatchPhrasePrefix map[string]string `json:"match_phrase_prefix,omitempty"` - Regexp map[string]string `json:"regexp,omitempty"` - *Range `json:"range,omitempty"` -} - -type Range struct { - *Time `json:"time,omitempty"` -} - -type Time struct { - Gte *time.Time `json:"gte,omitempty"` - Lte *time.Time `json:"lte,omitempty"` -} - -type Aggs struct { - *CardinalityAggregation `json:"container_count,omitempty"` - *DateHistogramAggregation `json:"log_count_over_time,omitempty"` -} - -type CardinalityAggregation struct { - *Cardinality `json:"cardinality,omitempty"` -} - -type Cardinality struct { - Field string `json:"field,omitempty"` -} - -type DateHistogramAggregation struct { - *DateHistogram `json:"date_histogram,omitempty"` -} - -type DateHistogram struct { - Field string `json:"field,omitempty"` - Interval string `json:"interval,omitempty"` -} - -// --------------------------------------------- Response Body --------------------------------------------- - -type Response struct { - ScrollId string `json:"_scroll_id,omitempty"` - Hits `json:"hits,omitempty"` - Aggregations `json:"aggregations,omitempty"` -} - -type Hits struct { - Total interface{} `json:"total"` // `As of Elasticsearch v7.x, hits.total is changed incompatibly - AllHits []Hit `json:"hits"` -} - -type Hit struct { - Source `json:"_source"` - Sort []int64 `json:"sort"` -} - -type Source struct { - Log string `json:"log"` - Time string `json:"time"` - Kubernetes `json:"kubernetes"` -} - -type Kubernetes struct { - Namespace string `json:"namespace_name"` - Pod string `json:"pod_name"` - Container string `json:"container_name"` - Host string `json:"host"` -} - -type Aggregations struct { - ContainerCount `json:"container_count"` - LogCountOverTime `json:"log_count_over_time"` -} - -type ContainerCount struct { - Value int64 `json:"value"` -} - -type LogCountOverTime struct { - Buckets []Bucket `json:"buckets"` -} - -type Bucket struct { - Time int64 `json:"key"` - Count int64 `json:"doc_count"` -} diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch.go b/pkg/simple/client/logging/elasticsearch/elasticsearch.go index b3c9d64c9d21fd474b377337db3871da1f2e8599..aa192799eb4a656250e82d299e9cfef885d893be 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch.go @@ -18,288 +18,153 @@ package elasticsearch import ( "bytes" - "context" + "encoding/json" "fmt" - "github.com/json-iterator/go" "io" + "kubesphere.io/kubesphere/pkg/simple/client/es" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" "kubesphere.io/kubesphere/pkg/simple/client/logging" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7" - "kubesphere.io/kubesphere/pkg/utils/esutil" + "time" + "kubesphere.io/kubesphere/pkg/utils/stringutils" - "strings" - "sync" ) const ( - ElasticV5 = "5" - ElasticV6 = "6" - ElasticV7 = "7" + podNameMaxLength = 63 + podNameSuffixLength = 6 // 5 characters + 1 hyphen + replicaSetSuffixMaxLength = 11 // max 10 characters + 1 hyphen ) -// Elasticsearch implement logging interface -type Elasticsearch struct { - host string - version string - index string - - c client - mux sync.Mutex +type Source struct { + Log string `json:"log"` + Time string `json:"time"` + Kubernetes `json:"kubernetes"` } -// versioned es client interface -type client interface { - Search(indices string, body []byte, scroll bool) ([]byte, error) - Scroll(id string) ([]byte, error) - ClearScroll(id string) - GetTotalHitCount(v interface{}) int64 +type Kubernetes struct { + Namespace string `json:"namespace_name"` + Pod string `json:"pod_name"` + Container string `json:"container_name"` + Host string `json:"host"` } -func NewElasticsearch(options *Options) (*Elasticsearch, error) { - var err error - es := &Elasticsearch{ - host: options.Host, - version: options.Version, - index: options.IndexPrefix, - } - - switch es.version { - case ElasticV5: - es.c, err = v5.New(es.host, es.index) - case ElasticV6: - es.c, err = v6.New(es.host, es.index) - case ElasticV7: - es.c, err = v7.New(es.host, es.index) - case "": - es.c = nil - default: - return nil, fmt.Errorf("unsupported elasticsearch version %s", es.version) - } - - return es, err +// Elasticsearch implement logging interface +type client struct { + c *es.Client } -func (es *Elasticsearch) loadClient() error { - // Check if Elasticsearch client has been initialized. - if es.c != nil { - return nil - } - - // Create Elasticsearch client. - es.mux.Lock() - defer es.mux.Unlock() - - if es.c != nil { - return nil - } - - // Detect Elasticsearch server version using Info API. - // Info API is backward compatible across v5, v6 and v7. - esv6, err := v6.New(es.host, "") - if err != nil { - return err - } - - res, err := esv6.Client.Info( - esv6.Client.Info.WithContext(context.Background()), - ) - if err != nil { - return err - } - defer res.Body.Close() - - var b map[string]interface{} - if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil { - return err - } - if res.IsError() { - // Print the response status and error information. - e, _ := b["error"].(map[string]interface{}) - return fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"]) - } - - // get the major version - version, _ := b["version"].(map[string]interface{}) - number, _ := version["number"].(string) - if number == "" { - return fmt.Errorf("failed to detect elastic version number") - } - - var c client - v := strings.Split(number, ".")[0] - switch v { - case ElasticV5: - c, err = v5.New(es.host, es.index) - case ElasticV6: - c, err = v6.New(es.host, es.index) - case ElasticV7: - c, err = v7.New(es.host, es.index) - default: - err = fmt.Errorf("unsupported elasticsearch version %s", version) - } +func NewClient(options *logging.Options) (logging.Client, error) { - if err != nil { - return err - } + c := &client{} - es.c = c - es.version = v - return nil + var err error + c.c, err = es.NewClient(options.Host, options.IndexPrefix, options.Version) + return c, err } -func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) { +func (c *client) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) { var err error - err = es.loadClient() - if err != nil { - return logging.Statistics{}, err - } - - body, err := newBodyBuilder(). - mainBool(sf). - cardinalityAggregation(). - bytes() - if err != nil { - return logging.Statistics{}, err - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(sf)). + WithAggregations(query.NewAggregations(). + WithCardinalityAggregation("kubernetes.docker_id.keyword")). + WithSize(0) - b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true) - if err != nil { - return logging.Statistics{}, err - } - - res, err := parseResponse(b) + resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false) if err != nil { return logging.Statistics{}, err } return logging.Statistics{ - Containers: res.Value, - Logs: es.c.GetTotalHitCount(res.Total), + Containers: resp.Value, + Logs: c.c.GetTotalHitCount(resp.Total), }, nil } -func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) { - var err error +func (c *client) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) { - err = es.loadClient() - if err != nil { - return logging.Histogram{}, err - } - - body, err := newBodyBuilder(). - mainBool(sf). - dateHistogramAggregation(interval). - bytes() - if err != nil { - return logging.Histogram{}, err - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(sf)). + WithAggregations(query.NewAggregations(). + WithDateHistogramAggregation("time", interval)). + WithSize(0) - b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false) + resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false) if err != nil { return logging.Histogram{}, err } - res, err := parseResponse(b) - if err != nil { - return logging.Histogram{}, err + h := logging.Histogram{ + Total: c.c.GetTotalHitCount(resp.Total), } - - var h logging.Histogram - h.Total = es.c.GetTotalHitCount(res.Total) - for _, b := range res.Buckets { + for _, bucket := range resp.Buckets { h.Buckets = append(h.Buckets, logging.Bucket{ - Time: b.Time, - Count: b.Count, + Time: bucket.Key, + Count: bucket.Count, }) } return h, nil } -func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) { - var err error - - err = es.loadClient() - if err != nil { - return logging.Logs{}, err - } +func (c *client) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) { - body, err := newBodyBuilder(). - mainBool(sf). - from(f). - size(s). - sort(o). - bytes() - if err != nil { - return logging.Logs{}, err - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(sf)). + WithSort("time", o). + WithFrom(f). + WithSize(s) - b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false) + resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false) if err != nil { return logging.Logs{}, err } - res, err := parseResponse(b) - if err != nil { - return logging.Logs{}, err + l := logging.Logs{ + Total: c.c.GetTotalHitCount(resp.Total), } - var l logging.Logs - l.Total = es.c.GetTotalHitCount(res.Total) - for _, hit := range res.AllHits { + for _, hit := range resp.AllHits { + s := c.getSource(hit.Source) l.Records = append(l.Records, logging.Record{ - Log: hit.Log, - Time: hit.Time, - Namespace: hit.Namespace, - Pod: hit.Pod, - Container: hit.Container, + Log: s.Log, + Time: s.Time, + Namespace: s.Namespace, + Pod: s.Pod, + Container: s.Container, }) } return l, nil } -func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error { - var err error +func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error { + var id string var data []string - err = es.loadClient() - if err != nil { - return err - } + b := query.NewBuilder(). + WithQuery(parseToQueryPart(sf)). + WithSort("time", "desc"). + WithFrom(0). + WithSize(1000) - // Initial Search - body, err := newBodyBuilder(). - mainBool(sf). - from(0). - size(1000). - sort("desc"). - bytes() + resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, true) if err != nil { return err } - b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true) - defer es.ClearScroll(id) - if err != nil { - return err - } - res, err := parseResponse(b) - if err != nil { - return err - } + defer c.c.ClearScroll(id) - id = res.ScrollId - for _, hit := range res.AllHits { - data = append(data, hit.Log) + id = resp.ScrollId + for _, hit := range resp.AllHits { + data = append(data, c.getSource(hit.Source).Log) } // limit to retrieve max 100k records for i := 0; i < 100; i++ { if i != 0 { - data, id, err = es.scroll(id) + data, id, err = c.scroll(id) if err != nil { return err } @@ -320,26 +185,122 @@ func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error return nil } -func (es *Elasticsearch) scroll(id string) ([]string, string, error) { - b, err := es.c.Scroll(id) +func (c *client) scroll(id string) ([]string, string, error) { + resp, err := c.c.Scroll(id) if err != nil { return nil, id, err } - res, err := parseResponse(b) + var data []string + for _, hit := range resp.AllHits { + data = append(data, c.getSource(hit.Source).Log) + } + return data, resp.ScrollId, nil +} + +func (c *client) getSource(val interface{}) Source { + + s := Source{} + + bs, err := json.Marshal(val) if err != nil { - return nil, id, err + return s } - var data []string - for _, hit := range res.AllHits { - data = append(data, hit.Log) + err = json.Unmarshal(bs, &s) + if err != nil { + return s } - return data, res.ScrollId, nil + + return s } -func (es *Elasticsearch) ClearScroll(id string) { - if id != "" { - es.c.ClearScroll(id) +func parseToQueryPart(sf logging.SearchFilter) *query.Query { + + var mini int32 = 1 + b := query.NewBool() + + bi := query.NewBool().WithMinimumShouldMatch(mini) + for ns, t := range sf.NamespaceFilter { + ct := time.Time{} + if t != nil { + ct = *t + } + + bi.AppendShould(query.NewBool(). + AppendFilter(query.NewMatchPhrase("kubernetes.namespace_name.keyword", ns)). + AppendFilter(query.NewRange("time").WithGTE(ct))) + } + b.AppendFilter(bi) + + if sf.WorkloadFilter != nil { + bi := query.NewBool().WithMinimumShouldMatch(mini) + for _, wk := range sf.WorkloadFilter { + bi.AppendShould(query.NewRegexp("kubernetes.pod_name.keyword", podNameRegexp(wk))) + } + + b.AppendFilter(bi) } + + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("kubernetes.pod_name.keyword", sf.PodFilter)). + WithMinimumShouldMatch(mini)) + + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrase("kubernetes.container_name.keyword", sf.ContainerFilter)). + WithMinimumShouldMatch(mini)) + + // fuzzy matching + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.pod_name", sf.WorkloadSearch)). + WithMinimumShouldMatch(mini)) + + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.pod_name", sf.PodSearch)). + WithMinimumShouldMatch(mini)) + + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.container_name", sf.ContainerSearch)). + WithMinimumShouldMatch(mini)) + + b.AppendFilter(query.NewBool(). + AppendMultiShould(query.NewMultiMatchPhrasePrefix("log", sf.LogSearch)). + WithMinimumShouldMatch(mini)) + + r := query.NewRange("time") + if !sf.Starttime.IsZero() { + r.WithGTE(sf.Starttime) + } + if !sf.Endtime.IsZero() { + r.WithLTE(sf.Endtime) + } + + b.AppendFilter(r) + + return query.NewQuery().WithBool(b) +} + +func podNameRegexp(workloadName string) string { + var regexp string + if len(workloadName) <= podNameMaxLength-replicaSetSuffixMaxLength-podNameSuffixLength { + // match deployment pods, eg. <deploy>-579dfbcddd-24znw + // replicaset rand string is limited to vowels + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83 + regexp += workloadName + "-[bcdfghjklmnpqrstvwxz2456789]{1,10}-[a-z0-9]{5}|" + // match statefulset pods, eg. <sts>-0 + regexp += workloadName + "-[0-9]+|" + // match pods of daemonset or job, eg. <ds>-29tdk, <job>-5xqvl + regexp += workloadName + "-[a-z0-9]{5}" + } else if len(workloadName) <= podNameMaxLength-podNameSuffixLength { + replicaSetSuffixLength := podNameMaxLength - podNameSuffixLength - len(workloadName) + regexp += fmt.Sprintf("%s%d%s", workloadName+"-[bcdfghjklmnpqrstvwxz2456789]{", replicaSetSuffixLength, "}[a-z0-9]{5}|") + regexp += workloadName + "-[0-9]+|" + regexp += workloadName + "-[a-z0-9]{5}" + } else { + // Rand suffix may overwrites the workload name if the name is too long + // This won't happen for StatefulSet because long name will cause ReplicaSet fails during StatefulSet creation. + regexp += workloadName[:podNameMaxLength-podNameSuffixLength+1] + "[a-z0-9]{5}|" + regexp += workloadName + "-[0-9]+" + } + return regexp } diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go index 5ffed0c0ef5d94817c103d1d3fac2786214a40a7..34a53ada86a92080d8e06711b68a6355ff806038 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go @@ -21,48 +21,15 @@ import ( "github.com/google/go-cmp/cmp" "github.com/json-iterator/go" "io/ioutil" + "kubesphere.io/kubesphere/pkg/simple/client/es" + "kubesphere.io/kubesphere/pkg/simple/client/es/query" "kubesphere.io/kubesphere/pkg/simple/client/logging" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6" - "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7" "net/http" "net/http/httptest" "testing" + "time" ) -func TestInitClient(t *testing.T) { - var tests = []struct { - fakeResp string - expected string - }{ - { - fakeResp: "es6_detect_version_major_200.json", - expected: ElasticV6, - }, - { - fakeResp: "es7_detect_version_major_200.json", - expected: ElasticV7, - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - es := mockElasticsearchService("/", test.fakeResp, http.StatusOK) - defer es.Close() - - client := &Elasticsearch{host: es.URL} - err := client.loadClient() - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(client.version, test.expected); diff != "" { - t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) - } - }) - } -} - func TestGetCurrentStats(t *testing.T) { var tests = []struct { fakeVersion string @@ -72,7 +39,7 @@ func TestGetCurrentStats(t *testing.T) { expectedErr string }{ { - fakeVersion: ElasticV6, + fakeVersion: es.ElasticV6, fakeResp: "es6_get_current_stats_200.json", fakeCode: http.StatusOK, expected: logging.Statistics{ @@ -81,13 +48,13 @@ func TestGetCurrentStats(t *testing.T) { }, }, { - fakeVersion: ElasticV6, + fakeVersion: es.ElasticV6, fakeResp: "es6_get_current_stats_404.json", fakeCode: http.StatusNotFound, expectedErr: "type: index_not_found_exception, reason: no such index", }, { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_get_current_stats_200.json", fakeCode: http.StatusOK, expected: logging.Statistics{ @@ -96,7 +63,7 @@ func TestGetCurrentStats(t *testing.T) { }, }, { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_get_current_stats_404.json", fakeCode: http.StatusNotFound, expectedErr: "type: index_not_found_exception, reason: no such index [ks-logstash-log-2020.05.2]", @@ -108,9 +75,16 @@ func TestGetCurrentStats(t *testing.T) { srv := mockElasticsearchService("/ks-logstash-log*/_search", test.fakeResp, test.fakeCode) defer srv.Close() - es := newElasticsearchClient(srv, test.fakeVersion) + client, err := NewClient(&logging.Options{ + Host: srv.URL, + IndexPrefix: "ks-logstash-log", + Version: test.fakeVersion, + }) + if err != nil { + t.Fatalf("create client error, %s", err) + } - result, err := es.GetCurrentStats(logging.SearchFilter{}) + result, err := client.GetCurrentStats(logging.SearchFilter{}) if test.expectedErr != "" { if diff := cmp.Diff(fmt.Sprint(err), test.expectedErr); diff != "" { t.Fatalf("%T differ (-got, +want): %s", test.expectedErr, diff) @@ -132,7 +106,7 @@ func TestCountLogsByInterval(t *testing.T) { expectedErr string }{ { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_count_logs_by_interval_200.json", fakeCode: http.StatusOK, expected: logging.Histogram{ @@ -154,13 +128,13 @@ func TestCountLogsByInterval(t *testing.T) { }, }, { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_count_logs_by_interval_400.json", fakeCode: http.StatusBadRequest, expectedErr: "type: search_phase_execution_exception, reason: all shards failed", }, { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_count_logs_by_interval_404.json", fakeCode: http.StatusNotFound, expectedErr: "type: index_not_found_exception, reason: no such index [ks-logstash-log-20]", @@ -172,9 +146,16 @@ func TestCountLogsByInterval(t *testing.T) { srv := mockElasticsearchService("/ks-logstash-log*/_search", test.fakeResp, test.fakeCode) defer srv.Close() - es := newElasticsearchClient(srv, test.fakeVersion) + client, err := NewClient(&logging.Options{ + Host: srv.URL, + IndexPrefix: "ks-logstash-log", + Version: test.fakeVersion, + }) + if err != nil { + t.Fatalf("create client error, %s", err) + } - result, err := es.CountLogsByInterval(logging.SearchFilter{}, "15m") + result, err := client.CountLogsByInterval(logging.SearchFilter{}, "15m") if test.expectedErr != "" { if diff := cmp.Diff(fmt.Sprint(err), test.expectedErr); diff != "" { t.Fatalf("%T differ (-got, +want): %s", test.expectedErr, diff) @@ -196,7 +177,7 @@ func TestSearchLogs(t *testing.T) { expectedErr string }{ { - fakeVersion: ElasticV7, + fakeVersion: es.ElasticV7, fakeResp: "es7_search_logs_200.json", fakeCode: http.StatusOK, expected: "es7_search_logs_200_result.json", @@ -214,9 +195,16 @@ func TestSearchLogs(t *testing.T) { srv := mockElasticsearchService("/ks-logstash-log*/_search", test.fakeResp, test.fakeCode) defer srv.Close() - es := newElasticsearchClient(srv, test.fakeVersion) + client, err := NewClient(&logging.Options{ + Host: srv.URL, + IndexPrefix: "ks-logstash-log", + Version: test.fakeVersion, + }) + if err != nil { + t.Fatalf("create client error, %s", err) + } - result, err := es.SearchLogs(logging.SearchFilter{}, 0, 10, "asc") + result, err := client.SearchLogs(logging.SearchFilter{}, 0, 10, "asc") if test.expectedErr != "" { if diff := cmp.Diff(fmt.Sprint(err), test.expectedErr); diff != "" { t.Fatalf("%T differ (-got, +want): %s", test.expectedErr, diff) @@ -229,29 +217,84 @@ func TestSearchLogs(t *testing.T) { } } +func TestParseToQueryPart(t *testing.T) { + var tests = []struct { + filter logging.SearchFilter + expected string + }{ + { + filter: logging.SearchFilter{ + NamespaceFilter: map[string]*time.Time{ + "default": func() *time.Time { t := time.Unix(1589981934, 0); return &t }(), + }, + }, + expected: "api_body_1.json", + }, + { + filter: logging.SearchFilter{ + WorkloadFilter: []string{"mysql"}, + Starttime: time.Unix(1589980934, 0), + Endtime: time.Unix(1589981934, 0), + }, + expected: "api_body_2.json", + }, + { + filter: logging.SearchFilter{ + PodFilter: []string{"mysql"}, + PodSearch: []string{"mysql-a8w3s-10945j"}, + LogSearch: []string{"info"}, + }, + expected: "api_body_3.json", + }, + { + filter: logging.SearchFilter{ + ContainerFilter: []string{"mysql-1"}, + ContainerSearch: []string{"mysql-3"}, + }, + expected: "api_body_4.json", + }, + { + filter: logging.SearchFilter{ + Starttime: time.Unix(1590744676, 0), + }, + expected: "api_body_7.json", + }, + { + filter: logging.SearchFilter{ + NamespaceFilter: map[string]*time.Time{ + "default": nil, + }, + }, + expected: "api_body_8.json", + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + + expected, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", test.expected)) + if err != nil { + t.Fatalf("read expected error, %s", err.Error()) + } + + result, _ := query.NewBuilder().WithQuery(parseToQueryPart(test.filter)).Bytes() + if diff := cmp.Diff(string(result), string(result)); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", expected, diff) + } + }) + } +} + func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) { b, _ := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", fakeResp)) res.WriteHeader(fakeCode) - res.Write(b) + _, _ = res.Write(b) }) return httptest.NewServer(mux) } -func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch { - es := &Elasticsearch{index: "ks-logstash-log"} - switch version { - case ElasticV5: - es.c, _ = v5.New(srv.URL, "ks-logstash-log") - case ElasticV6: - es.c, _ = v6.New(srv.URL, "ks-logstash-log") - case ElasticV7: - es.c, _ = v7.New(srv.URL, "ks-logstash-log") - } - return es -} - func JsonFromFile(expectedFile string, expectedJsonPtr interface{}) error { json, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", expectedFile)) if err != nil { diff --git a/pkg/simple/client/logging/elasticsearch/testdata/es6_get_current_stats_200.json b/pkg/simple/client/logging/elasticsearch/testdata/es6_get_current_stats_200.json index db91c786c76ed00d6223aa33f735b899c0f1df9a..1f70772f7e7cd60312a7e7ad75f083738b1188cf 100644 --- a/pkg/simple/client/logging/elasticsearch/testdata/es6_get_current_stats_200.json +++ b/pkg/simple/client/logging/elasticsearch/testdata/es6_get_current_stats_200.json @@ -204,8 +204,8 @@ ] }, "aggregations": { - "container_count": { + "cardinality_aggregation": { "value": 93 } } -} \ No newline at end of file +} diff --git a/pkg/simple/client/logging/elasticsearch/testdata/es7_count_logs_by_interval_200.json b/pkg/simple/client/logging/elasticsearch/testdata/es7_count_logs_by_interval_200.json index ab014e8bea3ffc5625e081bb60b42cf813c83c06..9ec221b0cc62746b4f28d47f1e80b6f959fc65f1 100644 --- a/pkg/simple/client/logging/elasticsearch/testdata/es7_count_logs_by_interval_200.json +++ b/pkg/simple/client/logging/elasticsearch/testdata/es7_count_logs_by_interval_200.json @@ -207,7 +207,7 @@ ] }, "aggregations": { - "log_count_over_time": { + "date_histogram_aggregation": { "buckets": [ { "key_as_string": "2020-05-16T16:00:00.000Z", @@ -227,4 +227,4 @@ ] } } -} \ No newline at end of file +} diff --git a/pkg/simple/client/logging/elasticsearch/testdata/es7_get_current_stats_200.json b/pkg/simple/client/logging/elasticsearch/testdata/es7_get_current_stats_200.json index b4b55c73171f1fc0b84c4036ce41c0252a649e8a..96b9c993b1d5bb7cff42e1146e2d4bc8865dc39e 100644 --- a/pkg/simple/client/logging/elasticsearch/testdata/es7_get_current_stats_200.json +++ b/pkg/simple/client/logging/elasticsearch/testdata/es7_get_current_stats_200.json @@ -207,8 +207,8 @@ ] }, "aggregations": { - "container_count": { + "cardinality_aggregation": { "value": 48 } } -} \ No newline at end of file +} diff --git a/pkg/simple/client/logging/interface.go b/pkg/simple/client/logging/interface.go index 52d2b370f4252208b71eebfe91cdaff92586a21d..bbb1614662dd6da745d977a21e521c977460d973 100644 --- a/pkg/simple/client/logging/interface.go +++ b/pkg/simple/client/logging/interface.go @@ -21,7 +21,7 @@ import ( "time" ) -type Interface interface { +type Client interface { GetCurrentStats(sf SearchFilter) (Statistics, error) CountLogsByInterval(sf SearchFilter, interval string) (Histogram, error) SearchLogs(sf SearchFilter, from, size int64, order string) (Logs, error) diff --git a/pkg/simple/client/logging/elasticsearch/options.go b/pkg/simple/client/logging/options.go similarity index 84% rename from pkg/simple/client/logging/elasticsearch/options.go rename to pkg/simple/client/logging/options.go index 5526f562f655e7bb88099f1d8964c4a53a53a365..115bef294b3071fe3cf854ec08fefc1e3150ce03 100644 --- a/pkg/simple/client/logging/elasticsearch/options.go +++ b/pkg/simple/client/logging/options.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package elasticsearch +package logging import ( "github.com/spf13/pflag" @@ -27,7 +27,7 @@ type Options struct { Version string `json:"version" yaml:"version"` } -func NewElasticSearchOptions() *Options { +func NewLoggingOptions() *Options { return &Options{ Host: "", IndexPrefix: "fluentbit", @@ -42,21 +42,20 @@ func (s *Options) ApplyTo(options *Options) { } func (s *Options) Validate() []error { - errs := []error{} - + errs := make([]error, 0) return errs } func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { - fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+ + fs.StringVar(&s.Host, "logging-elasticsearch-host", c.Host, ""+ "Elasticsearch logging service host. KubeSphere is using elastic as log store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin log API instead, and"+ " the following elastic search options will be ignored.") - fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+ + fs.StringVar(&s.IndexPrefix, "logging-index-prefix", c.IndexPrefix, ""+ "Index name prefix. KubeSphere will retrieve logs against indices matching the prefix.") - fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+ + fs.StringVar(&s.Version, "logging-elasticsearch-version", c.Version, ""+ "Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+ "Currently, minimum supported version is 5.x") }