diff --git a/pkg/simple/client/events/elasticsearch/clients.go b/pkg/simple/client/events/elasticsearch/clients.go index ddbd99051da1d288718604e54af4ca346d613b0a..5f2308651c82bec286674f7cf663c0f390d3d6ff 100644 --- a/pkg/simple/client/events/elasticsearch/clients.go +++ b/pkg/simple/client/events/elasticsearch/clients.go @@ -41,7 +41,8 @@ func (e Error) Error() string { type ClientV5 es5.Client func (c *ClientV5) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), + c.Search.WithIgnoreUnavailable(true))) } func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) { if err != nil { @@ -85,7 +86,8 @@ func (c *ClientV5) Version() (string, error) { type ClientV6 es6.Client func (c *ClientV6) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), + c.Search.WithIgnoreUnavailable(true))) } func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) { if err != nil { @@ -114,7 +116,8 @@ func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) { type ClientV7 es7.Client func (c *ClientV7) ExSearch(r *Request) (*Response, error) { - return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body))) + return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body), + c.Search.WithIgnoreUnavailable(true))) } func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) { if err != nil { diff --git a/pkg/simple/client/events/elasticsearch/elasticsearch.go b/pkg/simple/client/events/elasticsearch/elasticsearch.go index ca22ae8741b0371c66ca99a07fb2657a1d9891fd..281afe1543cac31c5741c898117e19205c362568 100644 --- a/pkg/simple/client/events/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/events/elasticsearch/elasticsearch.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "strings" + "sync" "time" es5 "github.com/elastic/go-elasticsearch/v5" @@ -12,18 +13,19 @@ import ( jsoniter "github.com/json-iterator/go" corev1 "k8s.io/api/core/v1" "kubesphere.io/kubesphere/pkg/simple/client/events" + "kubesphere.io/kubesphere/pkg/utils/esutil" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary -type Elasticsearch struct { +type elasticsearch struct { c client opts struct { - index string + indexPrefix string } } -func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64, +func (es *elasticsearch) SearchEvents(filter *events.Filter, from, size int64, sort string) (*events.Events, error) { queryPart := parseToQueryPart(filter) if sort == "" { @@ -44,7 +46,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64, return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -64,7 +66,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64, return &evts, nil } -func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { +func (es *elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { if interval == "" { interval = "15m" } @@ -90,7 +92,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) ( return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -116,7 +118,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) ( return &histo, nil } -func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { +func (es *elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { queryPart := parseToQueryPart(filter) aggName := "resources_count" aggsPart := map[string]interface{}{ @@ -137,7 +139,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S return nil, err } resp, err := es.c.ExSearch(&Request{ - Index: es.opts.index, + Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime), Body: bytes.NewBuffer(body), }) if err != nil || resp == nil { @@ -158,7 +160,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S }, nil } -func NewClient(options *Options) (*Elasticsearch, error) { +func newClient(options *Options) (*elasticsearch, error) { clientV5 := func() (*ClientV5, error) { c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}}) if err != nil { @@ -183,10 +185,10 @@ func NewClient(options *Options) (*Elasticsearch, error) { var ( version = options.Version - es = Elasticsearch{} + es = elasticsearch{} err error ) - es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix) + es.opts.indexPrefix = options.IndexPrefix if options.Version == "" { var c5 *ClientV5 @@ -218,6 +220,58 @@ func NewClient(options *Options) (*Elasticsearch, error) { return &es, nil } +type Elasticsearch struct { + innerEs *elasticsearch + options Options + mutex sync.Mutex +} + +func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64, + sort string) (*events.Events, error) { + ies, e := es.getInnerEs() + if e != nil { + return nil, e + } + return ies.SearchEvents(filter, from, size, sort) +} + +func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) { + ies, e := es.getInnerEs() + if e != nil { + return nil, e + } + return ies.CountOverTime(filter, interval) +} + +func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) { + ies, e := es.getInnerEs() + if e != nil { + return nil, e + } + return ies.StatisticsOnResources(filter) +} + +func (es *Elasticsearch) getInnerEs() (*elasticsearch, error) { + if es.innerEs != nil { + return es.innerEs, nil + } + es.mutex.Lock() + defer es.mutex.Unlock() + if es.innerEs != nil { + return es.innerEs, nil + } + ies, err := newClient(&es.options) + if err != nil { + return nil, err + } + es.innerEs = ies + return es.innerEs, nil +} + +func NewClient(options *Options) (*Elasticsearch, error) { + return &Elasticsearch{options: *options}, nil +} + func parseToQueryPart(f *events.Filter) interface{} { if f == nil { return nil @@ -347,3 +401,14 @@ func parseToQueryPart(f *events.Filter) interface{} { return queryBody } + +func resolveIndexNames(prefix string, start, end *time.Time) string { + var s, e time.Time + if start != nil { + s = *start + } + if end != nil { + e = *end + } + return esutil.ResolveIndexNames(prefix, s, e) +}