未验证 提交 b2402105 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #2441 from junotx/master

lazy init events client and optimize events search
......@@ -41,7 +41,8 @@ func (e Error) Error() string {
type ClientV5 es5.Client
func (c *ClientV5) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
c.Search.WithIgnoreUnavailable(true)))
}
func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) {
if err != nil {
......@@ -85,7 +86,8 @@ func (c *ClientV5) Version() (string, error) {
type ClientV6 es6.Client
func (c *ClientV6) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
c.Search.WithIgnoreUnavailable(true)))
}
func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
if err != nil {
......@@ -114,7 +116,8 @@ func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
type ClientV7 es7.Client
func (c *ClientV7) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
c.Search.WithIgnoreUnavailable(true)))
}
func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) {
if err != nil {
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"strings"
"sync"
"time"
es5 "github.com/elastic/go-elasticsearch/v5"
......@@ -12,18 +13,19 @@ import (
jsoniter "github.com/json-iterator/go"
corev1 "k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/simple/client/events"
"kubesphere.io/kubesphere/pkg/utils/esutil"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Elasticsearch struct {
type elasticsearch struct {
c client
opts struct {
index string
indexPrefix string
}
}
func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
func (es *elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
sort string) (*events.Events, error) {
queryPart := parseToQueryPart(filter)
if sort == "" {
......@@ -44,7 +46,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
......@@ -64,7 +66,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
return &evts, nil
}
func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
func (es *elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
if interval == "" {
interval = "15m"
}
......@@ -90,7 +92,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
......@@ -116,7 +118,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (
return &histo, nil
}
func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
func (es *elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
queryPart := parseToQueryPart(filter)
aggName := "resources_count"
aggsPart := map[string]interface{}{
......@@ -137,7 +139,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
......@@ -158,7 +160,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S
}, nil
}
func NewClient(options *Options) (*Elasticsearch, error) {
func newClient(options *Options) (*elasticsearch, error) {
clientV5 := func() (*ClientV5, error) {
c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}})
if err != nil {
......@@ -183,10 +185,10 @@ func NewClient(options *Options) (*Elasticsearch, error) {
var (
version = options.Version
es = Elasticsearch{}
es = elasticsearch{}
err error
)
es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix)
es.opts.indexPrefix = options.IndexPrefix
if options.Version == "" {
var c5 *ClientV5
......@@ -218,6 +220,58 @@ func NewClient(options *Options) (*Elasticsearch, error) {
return &es, nil
}
type Elasticsearch struct {
innerEs *elasticsearch
options Options
mutex sync.Mutex
}
func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
sort string) (*events.Events, error) {
ies, e := es.getInnerEs()
if e != nil {
return nil, e
}
return ies.SearchEvents(filter, from, size, sort)
}
func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
ies, e := es.getInnerEs()
if e != nil {
return nil, e
}
return ies.CountOverTime(filter, interval)
}
func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
ies, e := es.getInnerEs()
if e != nil {
return nil, e
}
return ies.StatisticsOnResources(filter)
}
func (es *Elasticsearch) getInnerEs() (*elasticsearch, error) {
if es.innerEs != nil {
return es.innerEs, nil
}
es.mutex.Lock()
defer es.mutex.Unlock()
if es.innerEs != nil {
return es.innerEs, nil
}
ies, err := newClient(&es.options)
if err != nil {
return nil, err
}
es.innerEs = ies
return es.innerEs, nil
}
func NewClient(options *Options) (*Elasticsearch, error) {
return &Elasticsearch{options: *options}, nil
}
func parseToQueryPart(f *events.Filter) interface{} {
if f == nil {
return nil
......@@ -347,3 +401,14 @@ func parseToQueryPart(f *events.Filter) interface{} {
return queryBody
}
func resolveIndexNames(prefix string, start, end *time.Time) string {
var s, e time.Time
if start != nil {
s = *start
}
if end != nil {
e = *end
}
return esutil.ResolveIndexNames(prefix, s, e)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册