未验证 提交 f8531ec2 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #768 from huanggze/log-download

feat: allow to export logs
......@@ -2,6 +2,14 @@ package v1alpha2
import (
"encoding/json"
"time"
)
const (
OperationQuery int = iota
OperationStatistics
OperationHistogram
OperationExport
)
// elasticsearch client config
......@@ -28,13 +36,14 @@ type QueryParameters struct {
ContainerQuery []string
LogQuery []string
Operation string
Interval string
StartTime string
EndTime string
Sort string
From int64
Size int64
Operation int
Interval string
StartTime string
EndTime string
Sort string
From int64
Size int64
ScrollTimeout time.Duration
}
// elasticsearch request body
......@@ -148,8 +157,7 @@ type DateHistogram struct {
// Fore more info, refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html
// Response body from the elasticsearch engine
type Response struct {
Status int `json:"status"`
Workspace string `json:"workspace,omitempty"`
ScrollId string `json:"_scroll_id"`
Shards Shards `json:"_shards"`
Hits Hits `json:"hits"`
Aggregations json.RawMessage `json:"aggregations"`
......@@ -195,7 +203,7 @@ type HighLight struct {
}
type LogRecord struct {
Time int64 `json:"time,omitempty" description:"log timestamp"`
Time string `json:"time,omitempty" description:"log timestamp"`
Log string `json:"log,omitempty" description:"log message"`
Namespace string `json:"namespace,omitempty" description:"namespace"`
Pod string `json:"pod,omitempty" description:"pod name"`
......@@ -205,10 +213,9 @@ type LogRecord struct {
}
type ReadResult struct {
Total int64 `json:"total" description:"total number of matched results"`
From int64 `json:"from" description:"the offset from the result set"`
Size int64 `json:"size" description:"the amount of hits to be returned"`
Records []LogRecord `json:"records,omitempty" description:"actual array of results"`
ScrollID string `json:"_scroll_id,omitempty"`
Total int64 `json:"total" description:"total number of matched results"`
Records []LogRecord `json:"records,omitempty" description:"actual array of results"`
}
// StatisticsResponseAggregations, the struct for `aggregations` of type Reponse, holds return results from the aggregation StatisticsAggs
......@@ -245,16 +252,11 @@ type StatisticsResult struct {
type HistogramResult struct {
Total int64 `json:"total" description:"total number of logs"`
StartTime int64 `json:"start_time" description:"start time"`
EndTime int64 `json:"end_time" description:"end time"`
Interval string `json:"interval" description:"interval"`
Histograms []HistogramRecord `json:"histograms" description:"actual array of histogram results"`
}
// Wrap elasticsearch response
type QueryResult struct {
Status int `json:"status,omitempty" description:"query status"`
Error string `json:"error,omitempty" description:"debugging information"`
Read *ReadResult `json:"query,omitempty" description:"query results"`
Statistics *StatisticsResult `json:"statistics,omitempty" description:"statistics results"`
Histogram *HistogramResult `json:"histogram,omitempty" description:"histogram results"`
......
......@@ -47,7 +47,7 @@ func addWebService(c *restful.Container) error {
ws.Route(ws.GET("/cluster").To(logging.LoggingQueryCluster).
Doc("Query logs against the cluster.").
Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("workspaces", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`").DataType("string").Required(false)).
Param(ws.QueryParameter("workspace_query", "A comma-separated list of keywords. Differing from **workspaces**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.").DataType("string").Required(false)).
Param(ws.QueryParameter("namespaces", "A comma-separated list of namespaces. This field restricts the query to specified namespaces. For example, the following filter matches the namespace my-ns and demo-ns: `my-ns,demo-ns`").DataType("string").Required(false)).
......@@ -69,7 +69,7 @@ func addWebService(c *restful.Container) error {
Writes(v1alpha2.QueryResult{}).
Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
Produces(restful.MIME_JSON, restful.MIME_OCTET)
ws.Route(ws.GET("/workspaces/{workspace}").To(logging.LoggingQueryWorkspace).
Doc("Query logs against the specific workspace.").
......@@ -166,7 +166,7 @@ func addWebService(c *restful.Container) error {
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Param(ws.PathParameter("pod", "Pod name.").DataType("string").Required(true)).
Param(ws.PathParameter("container", "Container name.").DataType("string").Required(true)).
Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("log_query", "A comma-separated list of keywords. The query returns logs which contain at least one keyword. Case-insensitive matching. For example, if the field is set to `err,INFO`, the query returns any log containing err(ERR,Err,...) *OR* INFO(info,InFo,...).").DataType("string").Required(false)).
Param(ws.QueryParameter("interval", "Time interval. It requires **operation** is set to histogram. The format is [0-9]+[smhdwMqy]. Defaults to 15m (i.e. 15 min).").DefaultValue("15m").DataType("string").Required(false)).
Param(ws.QueryParameter("start_time", "Start time of query. Default to 0. The format is a string representing milliseconds since the epoch, eg. 1559664000000.").DataType("string").Required(false)).
......@@ -178,7 +178,7 @@ func addWebService(c *restful.Container) error {
Writes(v1alpha2.QueryResult{}).
Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
Produces(restful.MIME_JSON, restful.MIME_OCTET)
ws.Route(ws.GET("/fluentbit/outputs").To(logging.LoggingQueryFluentbitOutputs).
Doc("List all Fluent bit output plugins.").
......
......@@ -157,7 +157,7 @@ func addWebService(c *restful.Container) error {
ws.Route(ws.GET("/logs").
To(tenant.LogQuery).
Doc("Query cluster-level logs in a multi-tenants environment").
Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)).
Param(ws.QueryParameter("workspaces", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`").DataType("string").Required(false)).
Param(ws.QueryParameter("workspace_query", "A comma-separated list of keywords. Differing from **workspaces**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.").DataType("string").Required(false)).
Param(ws.QueryParameter("namespaces", "A comma-separated list of namespaces. This field restricts the query to specified namespaces. For example, the following filter matches the namespace my-ns and demo-ns: `my-ns,demo-ns`").DataType("string").Required(false)).
......@@ -179,7 +179,7 @@ func addWebService(c *restful.Container) error {
Writes(v1alpha2.Response{}).
Returns(http.StatusOK, RespOK, v1alpha2.Response{})).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
Produces(restful.MIME_JSON, restful.MIME_OCTET)
c.Add(ws)
return nil
......
......@@ -19,7 +19,10 @@
package logging
import (
"bytes"
"fmt"
"github.com/emicklei/go-restful"
"io"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
"kubesphere.io/kubesphere/pkg/models/log"
......@@ -30,94 +33,45 @@ import (
"net/http"
"strconv"
"strings"
"time"
)
func LoggingQueryCluster(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelCluster, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
param := parseRequest(log.QueryLevelCluster, request)
if param.Operation == v1alpha2.OperationExport {
logExport(param, request, response)
} else {
logQuery(param, response)
}
response.WriteAsJson(res)
}
func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelWorkspace, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
}
response.WriteAsJson(res)
param := parseRequest(log.QueryLevelWorkspace, request)
logQuery(param, response)
}
func LoggingQueryNamespace(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelNamespace, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
}
response.WriteAsJson(res)
param := parseRequest(log.QueryLevelNamespace, request)
logQuery(param, response)
}
func LoggingQueryWorkload(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelWorkload, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
}
response.WriteAsJson(res)
param := parseRequest(log.QueryLevelWorkload, request)
logQuery(param, response)
}
func LoggingQueryPod(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelPod, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
}
response.WriteAsJson(res)
param := parseRequest(log.QueryLevelPod, request)
logQuery(param, response)
}
func LoggingQueryContainer(request *restful.Request, response *restful.Response) {
res, err := logQuery(log.QueryLevelContainer, request)
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err)
return
param := parseRequest(log.QueryLevelContainer, request)
if param.Operation == v1alpha2.OperationExport {
logExport(param, request, response)
} else {
logQuery(param, response)
}
if res.Status != http.StatusOK {
response.WriteHeaderAndEntity(res.Status, errors.New(res.Error))
return
}
response.WriteAsJson(res)
}
func LoggingQueryFluentbitOutputs(request *restful.Request, response *restful.Response) {
......@@ -130,7 +84,6 @@ func LoggingQueryFluentbitOutputs(request *restful.Request, response *restful.Re
}
func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Response) {
var output fb.OutputPlugin
var res *log.FluentbitOutputsResult
......@@ -151,7 +104,6 @@ func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Re
}
func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Response) {
var output fb.OutputPlugin
id := request.PathParameter("output")
......@@ -174,7 +126,6 @@ func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Re
}
func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Response) {
var res *log.FluentbitOutputsResult
id := request.PathParameter("output")
......@@ -188,22 +139,92 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re
response.WriteAsJson(res)
}
func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.QueryResult, error) {
func logQuery(param v1alpha2.QueryParameters, response *restful.Response) {
es, err := cs.ClientSets().ElasticSearch()
if err != nil {
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, errors.Wrap(err))
return
}
res, err := es.Query(param)
if err != nil {
response.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
return
}
response.WriteAsJson(res)
}
func logExport(param v1alpha2.QueryParameters, request *restful.Request, response *restful.Response) {
es, err := cs.ClientSets().ElasticSearch()
if err != nil {
klog.Error(err)
return nil, err
response.WriteHeaderAndEntity(http.StatusServiceUnavailable, errors.Wrap(err))
return
}
response.Header().Set("Content-Type", restful.MIME_OCTET)
// keep search context alive for 1m
param.ScrollTimeout = time.Minute
// export 1000 records in every iteration
param.Size = 1000
// from is not allowed in a scroll context
param.From = 0
var scrollId string
// limit to retrieve max 100k records
for i := 0; i < 100; i++ {
var res *v1alpha2.QueryResult
if scrollId == "" {
res, err = es.Query(param)
if err != nil {
response.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
return
}
} else {
res, err = es.Scroll(scrollId)
if err != nil {
break
}
}
if res.Read == nil || len(res.Read.Records) == 0 {
break
}
output := new(bytes.Buffer)
for _, r := range res.Read.Records {
output.WriteString(fmt.Sprintf(`%s`, stringutils.StripAnsi(r.Log)))
}
_, err = io.Copy(response, output)
if err != nil {
klog.Error(err)
break
}
scrollId = res.Read.ScrollID
select {
case <-request.Request.Context().Done():
break
default:
}
}
if scrollId != "" {
es.ClearScroll(scrollId)
}
}
func parseRequest(level log.LogQueryLevel, request *restful.Request) v1alpha2.QueryParameters {
var param v1alpha2.QueryParameters
switch level {
case log.QueryLevelCluster:
var namespaces []string
param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","),
stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive
stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","),
stringutils.Split(request.QueryParameter("workspaces"), ","),
stringutils.Split(strings.ToLower(request.QueryParameter("workspace_query")), ",")) // case-insensitive
stringutils.Split(strings.ToLower(request.QueryParameter("workspace_query")), ","))
param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces)
param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",")
param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",")
......@@ -214,8 +235,8 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.Quer
case log.QueryLevelWorkspace:
var namespaces []string
param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","),
stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive
stringutils.Split(request.PathParameter("workspace"), ","), nil) // case-insensitive
stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","),
stringutils.Split(request.PathParameter("workspace"), ","), nil)
param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces)
param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",")
param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",")
......@@ -254,21 +275,31 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.Quer
}
param.LogQuery = stringutils.Split(request.QueryParameter("log_query"), ",")
param.Operation = request.QueryParameter("operation")
param.Interval = request.QueryParameter("interval")
param.StartTime = request.QueryParameter("start_time")
param.EndTime = request.QueryParameter("end_time")
param.Sort = request.QueryParameter("sort")
switch request.QueryParameter("operation") {
case "statistics":
param.Operation = v1alpha2.OperationStatistics
case "histogram":
param.Operation = v1alpha2.OperationHistogram
case "export":
param.Operation = v1alpha2.OperationExport
default:
param.Operation = v1alpha2.OperationQuery
}
var err error
param.From, err = strconv.ParseInt(request.QueryParameter("from"), 10, 64)
if err != nil {
param.From = 0
}
param.Size, err = strconv.ParseInt(request.QueryParameter("size"), 10, 64)
if err != nil {
param.Size = 10
}
return es.Query(param), nil
return param
}
......@@ -339,6 +339,23 @@ func ListDevopsRules(req *restful.Request, resp *restful.Response) {
}
func LogQuery(req *restful.Request, resp *restful.Response) {
req, err := regenerateLoggingRequest(req)
switch {
case err != nil:
resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
case req != nil:
logging.LoggingQueryCluster(req, resp)
default:
if req.QueryParameter("operation") == "export" {
resp.Write(nil)
} else {
resp.WriteAsJson(loggingv1alpha2.QueryResult{})
}
}
}
// override namespace query conditions
func regenerateLoggingRequest(req *restful.Request) (*restful.Request, error) {
username := req.HeaderParameter(constants.UserNameHeader)
......@@ -348,9 +365,8 @@ func LogQuery(req *restful.Request, resp *restful.Response) {
clusterRules, err := iam.GetUserClusterRules(username)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
klog.Errorln(err)
return
return nil, err
}
hasClusterLogAccess := iam.RulesMatchesRequired(clusterRules, rbacv1.PolicyRule{Verbs: []string{"get"}, Resources: []string{"*"}, APIGroups: []string{"logging.kubesphere.io"}})
......@@ -361,9 +377,8 @@ func LogQuery(req *restful.Request, resp *restful.Response) {
namespaces := make([]string, 0)
roles, err := iam.GetUserRoles("", username)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
klog.Errorln(err)
return
return nil, err
}
for _, role := range roles {
if !sliceutil.HasString(namespaces, role.Namespace) && iam.RulesMatchesRequired(role.Rules, rbacv1.PolicyRule{Verbs: []string{"get"}, Resources: []string{"*"}, APIGroups: []string{"logging.kubesphere.io"}}) {
......@@ -374,17 +389,13 @@ func LogQuery(req *restful.Request, resp *restful.Response) {
// if the user belongs to no namespace
// then no log visible
if len(namespaces) == 0 {
res := loggingv1alpha2.QueryResult{Status: http.StatusOK}
resp.WriteAsJson(res)
return
return nil, nil
} else if len(queryNamespaces) == 1 && queryNamespaces[0] == "" {
values.Set("namespaces", strings.Join(namespaces, ","))
} else {
inter := intersection(queryNamespaces, namespaces)
if len(inter) == 0 {
res := loggingv1alpha2.QueryResult{Status: http.StatusOK}
resp.WriteAsJson(res)
return
return nil, nil
}
values.Set("namespaces", strings.Join(inter, ","))
}
......@@ -394,7 +405,7 @@ func LogQuery(req *restful.Request, resp *restful.Response) {
// forward the request to logging model
newHttpRequest, _ := http.NewRequest(http.MethodGet, newUrl.String(), nil)
logging.LoggingQueryCluster(restful.NewRequest(newHttpRequest), resp)
return restful.NewRequest(newHttpRequest), nil
}
func intersection(s1, s2 []string) (inter []string) {
......
......@@ -21,8 +21,6 @@ import (
v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7"
"net/http"
"strconv"
"strings"
"time"
......@@ -30,10 +28,6 @@ import (
)
const (
OperationQuery int = iota
OperationStatistics
OperationHistogram
matchPhrase = iota
matchPhrasePrefix
regexpQuery
......@@ -137,7 +131,7 @@ func detectVersionMajor(host string) (string, error) {
return v, nil
}
func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
func createQueryRequest(param v1alpha2.QueryParameters) ([]byte, error) {
var request v1alpha2.Request
var mainBoolQuery v1alpha2.BoolFilter
......@@ -190,16 +184,12 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: param.StartTime, Lte: param.EndTime}}}
mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery)
var operation int
if param.Operation == "statistics" {
operation = OperationStatistics
if param.Operation == v1alpha2.OperationStatistics {
containerAgg := v1alpha2.AggField{Field: "kubernetes.docker_id.keyword"}
statisticAggs := v1alpha2.StatisticsAggs{ContainerAgg: v1alpha2.ContainerAgg{Cardinality: containerAgg}}
request.Aggs = statisticAggs
request.Size = 0
} else if param.Operation == "histogram" {
operation = OperationHistogram
} else if param.Operation == v1alpha2.OperationHistogram {
var interval string
if param.Interval != "" {
interval = param.Interval
......@@ -210,7 +200,6 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
request.Aggs = v1alpha2.HistogramAggs{HistogramAgg: v1alpha2.HistogramAgg{DateHistogram: v1alpha2.DateHistogram{Field: "time", Interval: interval}}}
request.Size = 0
} else {
operation = OperationQuery
request.From = param.From
request.Size = param.Size
var order string
......@@ -232,9 +221,7 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
request.MainQuery = v1alpha2.BoolQuery{Bool: mainBoolQuery}
queryRequest, err := json.Marshal(request)
return operation, queryRequest, err
return json.Marshal(request)
}
func makeBoolShould(queryType int, field string, list []string) v1alpha2.BoolQuery {
......@@ -288,46 +275,14 @@ func makePodNameRegexp(workloadName string) string {
return regexp
}
func calcTimestamp(input string) int64 {
var t time.Time
var err error
var ret int64
ret = 0
t, err = time.Parse(time.RFC3339, input)
if err != nil {
var i int64
i, err = strconv.ParseInt(input, 10, 64)
if err == nil {
ret = time.Unix(i/1000, (i%1000)*1000000).UnixNano() / 1000000
}
} else {
ret = t.UnixNano() / 1000000
}
return ret
}
func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.QueryParameters, body []byte) *v1alpha2.QueryResult {
func (c *ElasticSearchClient) parseQueryResult(operation int, body []byte) (*v1alpha2.QueryResult, error) {
var queryResult v1alpha2.QueryResult
var response v1alpha2.Response
err := jsonIter.Unmarshal(body, &response)
if err != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
}
if response.Status != 0 {
//Elastic error, eg, es_rejected_execute_exception
err := "The query failed with no response"
queryResult.Status = response.Status
queryResult.Error = err
klog.Errorln(err)
return &queryResult
klog.Error(err)
return nil, err
}
if response.Shards.Successful != response.Shards.Total {
......@@ -337,14 +292,12 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
}
switch operation {
case OperationQuery:
case v1alpha2.OperationQuery:
var readResult v1alpha2.ReadResult
readResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
readResult.From = param.From
readResult.Size = param.Size
for _, hit := range response.Hits.Hits {
var logRecord v1alpha2.LogRecord
logRecord.Time = calcTimestamp(hit.Source.Time)
logRecord.Time = hit.Source.Time
logRecord.Log = hit.Source.Log
logRecord.Namespace = hit.Source.Kubernetes.Namespace
logRecord.Pod = hit.Source.Kubernetes.Pod
......@@ -354,32 +307,23 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
readResult.Records = append(readResult.Records, logRecord)
}
queryResult.Read = &readResult
case OperationStatistics:
case v1alpha2.OperationStatistics:
var statisticsResponse v1alpha2.StatisticsResponseAggregations
err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
klog.Error(err)
return nil, err
}
queryResult.Statistics = &v1alpha2.StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: c.client.GetTotalHitCount(response.Hits.Total)}
case OperationHistogram:
case v1alpha2.OperationHistogram:
var histogramResult v1alpha2.HistogramResult
histogramResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
histogramResult.StartTime = calcTimestamp(param.StartTime)
histogramResult.EndTime = calcTimestamp(param.EndTime)
histogramResult.Interval = param.Interval
var histogramAggregations v1alpha2.HistogramAggregations
err = jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
klog.Error(err)
return nil, err
}
for _, histogram := range histogramAggregations.HistogramAggregation.Histograms {
var histogramRecord v1alpha2.HistogramRecord
......@@ -390,58 +334,61 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
}
queryResult.Histogram = &histogramResult
case v1alpha2.OperationExport:
var readResult v1alpha2.ReadResult
readResult.ScrollID = response.ScrollId
for _, hit := range response.Hits.Hits {
var logRecord v1alpha2.LogRecord
logRecord.Log = hit.Source.Log
readResult.Records = append(readResult.Records, logRecord)
}
queryResult.Read = &readResult
}
queryResult.Status = http.StatusOK
return &queryResult
return &queryResult, nil
}
func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) *v1alpha2.QueryResult {
func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) (*v1alpha2.QueryResult, error) {
var queryResult = new(v1alpha2.QueryResult)
if param.NamespaceNotFound {
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusOK
switch param.Operation {
case "statistics":
case v1alpha2.OperationStatistics:
queryResult.Statistics = new(v1alpha2.StatisticsResult)
case "histogram":
queryResult.Histogram = &v1alpha2.HistogramResult{
StartTime: calcTimestamp(param.StartTime),
EndTime: calcTimestamp(param.EndTime),
Interval: param.Interval}
case v1alpha2.OperationHistogram:
queryResult.Histogram = new(v1alpha2.HistogramResult)
default:
queryResult.Read = new(v1alpha2.ReadResult)
}
return queryResult
return queryResult, nil
}
if c.client == nil {
queryResult.Status = http.StatusBadRequest
queryResult.Error = "can not create elastic search client"
return queryResult
query, err := createQueryRequest(param)
if err != nil {
klog.Error(err)
return nil, err
}
operation, query, err := createQueryRequest(param)
body, err := c.client.Search(query, param.ScrollTimeout)
if err != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
klog.Error(err)
return nil, err
}
body, err := c.client.Search(query)
return c.parseQueryResult(param.Operation, body)
}
func (c *ElasticSearchClient) Scroll(scrollId string) (*v1alpha2.QueryResult, error) {
body, err := c.client.Scroll(scrollId, time.Minute)
if err != nil {
klog.Errorln(err)
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
klog.Error(err)
return nil, err
}
return c.parseQueryResult(v1alpha2.OperationExport, body)
}
queryResult = c.parseQueryResult(operation, param, body)
return queryResult
func (c *ElasticSearchClient) ClearScroll(scrollId string) {
c.client.ClearScroll(scrollId)
}
package esclient
import "time"
type Client interface {
// Perform Search API
Search(body []byte) ([]byte, error)
Search(body []byte, scrollTimeout time.Duration) ([]byte, error)
Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error)
ClearScroll(scrollId string)
GetTotalHitCount(v interface{}) int64
}
......@@ -6,7 +6,9 @@ import (
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v5"
"github.com/elastic/go-elasticsearch/v5/esapi"
"io/ioutil"
"time"
)
type Elastic struct {
......@@ -15,7 +17,6 @@ type Elastic struct {
}
func New(address string, index string) *Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
......@@ -23,33 +24,60 @@ func New(address string, index string) *Elastic {
return &Elastic{client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithBody(bytes.NewBuffer(body)),
)
e.client.Search.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.client.Scroll(
e.client.Scroll.WithContext(context.Background()),
e.client.Scroll.WithScrollID(scrollId),
e.client.Scroll.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.client.ClearScroll(
e.client.ClearScroll.WithContext(context.Background()),
e.client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("%s: %s", e["type"], e["reason"])
}
}
......@@ -6,7 +6,9 @@ import (
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"io/ioutil"
"time"
)
type Elastic struct {
......@@ -15,7 +17,6 @@ type Elastic struct {
}
func New(address string, index string) *Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
......@@ -23,33 +24,60 @@ func New(address string, index string) *Elastic {
return &Elastic{Client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.Client.Search(
e.Client.Search.WithContext(context.Background()),
e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.Client.Search.WithBody(bytes.NewBuffer(body)),
)
e.Client.Search.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.Client.Scroll(
e.Client.Scroll.WithContext(context.Background()),
e.Client.Scroll.WithScrollID(scrollId),
e.Client.Scroll.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.Client.ClearScroll(
e.Client.ClearScroll.WithContext(context.Background()),
e.Client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("%s: %s", e["type"], e["reason"])
}
}
......@@ -6,7 +6,9 @@ import (
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"io/ioutil"
"time"
)
type Elastic struct {
......@@ -15,7 +17,6 @@ type Elastic struct {
}
func New(address string, index string) *Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
......@@ -23,35 +24,63 @@ func New(address string, index string) *Elastic {
return &Elastic{client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithTrackTotalHits(true),
e.client.Search.WithBody(bytes.NewBuffer(body)),
)
e.client.Search.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) {
response, err := e.client.Scroll(
e.client.Scroll.WithContext(context.Background()),
e.client.Scroll.WithScrollID(scrollId),
e.client.Scroll.WithScroll(scrollTimeout))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
b, err := ioutil.ReadAll(response.Body)
return b, err
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.client.ClearScroll(
e.client.ClearScroll.WithContext(context.Background()),
e.client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
m, _ := v.(map[string]interface{})
f, _ := m["value"].(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("%s: %s", e["type"], e["reason"])
}
}
......@@ -14,12 +14,17 @@ limitations under the License.
package stringutils
import (
"regexp"
"strings"
"unicode/utf8"
"github.com/asaskevich/govalidator"
)
const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"
var re = regexp.MustCompile(ansi)
// Creates an slice of slice values not included in the other given slice.
func Diff(base, exclude []string) (result []string) {
excludeMap := make(map[string]bool)
......@@ -83,3 +88,7 @@ func Split(str string, sep string) []string {
}
return strings.Split(str, sep)
}
func StripAnsi(str string) string {
return re.ReplaceAllString(str, "")
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册