diff --git a/pkg/simple/client/auditing/elasticsearch/elasticsearch.go b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go index 3f49d5232ffb4e621e332ca5da13403c11284026..cf08ea412b2221842b320d0adc02ba73569799ab 100644 --- a/pkg/simple/client/auditing/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/auditing/elasticsearch/elasticsearch.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strings" + "sync" "time" es5 "github.com/elastic/go-elasticsearch/v5" @@ -29,17 +30,30 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/auditing" ) +const ( + ElasticV5 = "5" + ElasticV6 = "6" + ElasticV7 = "7" +) + var json = jsoniter.ConfigCompatibleWithStandardLibrary type Elasticsearch struct { - c client - opts struct { - index string - } + host string + version string + index string + + c client + mux sync.Mutex } func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size int64, sort string) (*auditing.Events, error) { + + if err := es.loadClient(); err != nil { + return &auditing.Events{}, err + } + queryPart := parseToQueryPart(filter) if sort == "" { sort = "desc" @@ -59,7 +73,7 @@ func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: es.index, Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -80,6 +94,11 @@ func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size } func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) { + + if err := es.loadClient(); err != nil { + return &auditing.Histogram{}, err + } + if interval == "" { interval = "15m" } @@ -105,7 +124,7 @@ func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: es.index, Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -135,6 +154,11 @@ func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) } func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) { + + if err := es.loadClient(); err != nil { + return &auditing.Statistics{}, err + } + queryPart := parseToQueryPart(filter) aggName := "resources_count" aggsPart := map[string]interface{}{ @@ -155,7 +179,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditi return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: es.index, Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -180,63 +204,89 @@ func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditi } func NewClient(options *Options) (*Elasticsearch, error) { + es := &Elasticsearch{ + host: options.Host, + version: options.Version, + index: fmt.Sprintf("%s*", options.IndexPrefix), + } + + err := es.initEsClient(es.version) + return es, err +} + +func (es *Elasticsearch) initEsClient(version string) error { clientV5 := func() (*ClientV5, error) { - c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}}) + c, err := es5.NewClient(es5.Config{Addresses: []string{es.host}}) if err != nil { return nil, err } return (*ClientV5)(c), nil } clientV6 := func() (*ClientV6, error) { - c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}}) + c, err := es6.NewClient(es6.Config{Addresses: []string{es.host}}) if err != nil { return nil, err } return (*ClientV6)(c), nil } clientV7 := func() (*ClientV7, error) { - c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}}) + c, err := es7.NewClient(es7.Config{Addresses: []string{es.host}}) if err != nil { return nil, err } return (*ClientV7)(c), nil } - var ( - version = options.Version - es = Elasticsearch{} - err error - ) - es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix) + var err error + switch version { + case ElasticV5: + es.c, err = clientV5() + case ElasticV6: + es.c, err = clientV6() + case ElasticV7: + es.c, err = clientV7() + case "": + es.c = nil + default: + err = fmt.Errorf("unsupported elasticsearch version %s", es.version) + } + + return err +} - if options.Version == "" { - var c5 *ClientV5 - if c5, err = clientV5(); err == nil { - if version, err = c5.Version(); err == nil { - es.c = c5 - } - } +func (es *Elasticsearch) loadClient() error { + + // Check if Elasticsearch client has been initialized. + if es.c != nil { + return nil } - if err != nil { - return nil, err + + // Create Elasticsearch client. + es.mux.Lock() + defer es.mux.Unlock() + + if es.c != nil { + return nil } - switch strings.Split(version, ".")[0] { - case "5": - if es.c == nil { - es.c, err = clientV5() - } - case "6": - es.c, err = clientV6() - case "7": - es.c, err = clientV7() - default: - err = fmt.Errorf("unsupported elasticsearch version %s", version) + c, e := es5.NewClient(es5.Config{Addresses: []string{es.host}}) + if e != nil { + return e + } + + version, err := (*ClientV5)(c).Version() + if err != nil { + return err } + + v := strings.Split(version, ".")[0] + err = es.initEsClient(v) if err != nil { - return nil, err + return err } - return &es, nil + + es.version = v + return nil } func parseToQueryPart(f *auditing.Filter) interface{} {