diff --git a/cmd/ks-apiserver/apiserver.go b/cmd/ks-apiserver/apiserver.go index 623c22782de8d0aab3da0902f8179e4e8e8b35ee..10577b767eb04f76649a3a1348705d207e9d66d0 100644 --- a/cmd/ks-apiserver/apiserver.go +++ b/cmd/ks-apiserver/apiserver.go @@ -21,6 +21,7 @@ import ( "kubesphere.io/kubesphere/cmd/ks-apiserver/app" "log" // Install apis + _ "kubesphere.io/kubesphere/pkg/apis/logging/install" _ "kubesphere.io/kubesphere/pkg/apis/metrics/install" _ "kubesphere.io/kubesphere/pkg/apis/monitoring/install" _ "kubesphere.io/kubesphere/pkg/apis/operations/install" diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index f62d2e6fe94eedf4f7d05c8cd3a5b27c82856cf1..ff18a2f2db24434ffae657a70d45ae39af4b5f69 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -21,6 +21,7 @@ import ( goflag "flag" "fmt" "github.com/golang/glog" + "github.com/json-iterator/go" kconfig "github.com/kiali/kiali/config" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -28,11 +29,17 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/runtime" "kubesphere.io/kubesphere/pkg/filter" "kubesphere.io/kubesphere/pkg/informers" + logging "kubesphere.io/kubesphere/pkg/models/log" "kubesphere.io/kubesphere/pkg/signals" + es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" + fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" + "kubesphere.io/kubesphere/pkg/simple/client/mysql" "log" "net/http" ) +var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary + func NewAPIServerCommand() *cobra.Command { s := options.NewServerRunOptions() @@ -74,6 +81,7 @@ func Run(s *options.ServerRunOptions) error { } } + initializeESClientConfig() initializeKialiConfig(s) if s.GenericServerRunOptions.InsecurePort != 0 { @@ -106,6 +114,35 @@ func initializeKialiConfig(s *options.ServerRunOptions) { kconfig.Set(config) } +func initializeESClientConfig() { + + var outputs []logging.OutputDBBinding + var configs *es.ESConfigs + + db := mysql.Client() + if !db.HasTable(&logging.OutputDBBinding{}) { + // Panic + log.Fatal("Flyway migration is not completed") + } + + err := db.Find(&outputs).Error + if err != nil { + return + } + + // Retrieve es-type output from db + var params []fb.Parameter + for _, output := range outputs { + err := jsonIter.UnmarshalFromString(output.Parameters, ¶ms) + if err == nil { + if configs = logging.ParseEsOutputParams(params); configs != nil { + configs.WriteESConfigs() + return + } + } + } +} + func waitForResourceSync() { stopChan := signals.SetupSignalHandler() diff --git a/pkg/apis/logging/install/install.go b/pkg/apis/logging/install/install.go new file mode 100644 index 0000000000000000000000000000000000000000..27e393b5020be36bf1ce5deffed135e1fa1e426b --- /dev/null +++ b/pkg/apis/logging/install/install.go @@ -0,0 +1,33 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +package install + +import ( + "github.com/emicklei/go-restful" + urlruntime "k8s.io/apimachinery/pkg/util/runtime" + loggingv1alpha2 "kubesphere.io/kubesphere/pkg/apis/logging/v1alpha2" + "kubesphere.io/kubesphere/pkg/apiserver/runtime" +) + +func init() { + Install(runtime.Container) +} + +func Install(container *restful.Container) { + urlruntime.Must(loggingv1alpha2.AddToContainer(container)) +} diff --git a/pkg/apis/logging/v1alpha2/register.go b/pkg/apis/logging/v1alpha2/register.go new file mode 100644 index 0000000000000000000000000000000000000000..2247e7f764a4b2f0e944954398444dd7849a5cb2 --- /dev/null +++ b/pkg/apis/logging/v1alpha2/register.go @@ -0,0 +1,217 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +package v1alpha2 + +import ( + "github.com/emicklei/go-restful" + "github.com/emicklei/go-restful-openapi" + "k8s.io/apimachinery/pkg/runtime/schema" + "kubesphere.io/kubesphere/pkg/apiserver/logging" + "kubesphere.io/kubesphere/pkg/apiserver/runtime" + "kubesphere.io/kubesphere/pkg/filter" +) + +const GroupName = "logging.kubesphere.io" + +var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"} + +var ( + WebServiceBuilder = runtime.NewContainerBuilder(addWebService) + AddToContainer = WebServiceBuilder.AddToContainer +) + +func addWebService(c *restful.Container) error { + ws := runtime.NewWebService(GroupVersion) + tags := []string{"Logging"} + + ws.Route(ws.GET("/cluster").To(logging.LoggingQueryCluster). + Filter(filter.Logging). + Doc("cluster level log query"). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("workspaces", "workspaces specify").DataType("string").Required(false)). + Param(ws.QueryParameter("workspace_query", "workspace query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("namespaces", "namespaces specify").DataType("string").Required(false)). + Param(ws.QueryParameter("namespace_query", "namespace query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("workloads", "workloads specify").DataType("string").Required(false)). + Param(ws.QueryParameter("workload_query", "workload query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("pods", "pods specify").DataType("string").Required(false)). + Param(ws.QueryParameter("pod_query", "pod query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("containers", "containers specify").DataType("string").Required(false)). + Param(ws.QueryParameter("container_query", "container query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/workspaces/{workspace}").To(logging.LoggingQueryWorkspace). + Filter(filter.Logging). + Doc("workspace level log query"). + Param(ws.PathParameter("workspace", "workspace specify").DataType("string").Required(true)). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("namespaces", "namespaces specify").DataType("string").Required(false)). + Param(ws.QueryParameter("namespace_query", "namespace query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("workloads", "workloads specify").DataType("string").Required(false)). + Param(ws.QueryParameter("workload_query", "workload query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("pods", "pods specify").DataType("string").Required(false)). + Param(ws.QueryParameter("pod_query", "pod query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("containers", "containers specify").DataType("string").Required(false)). + Param(ws.QueryParameter("container_query", "container query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}").To(logging.LoggingQueryNamespace). + Filter(filter.Logging). + Doc("namespace level log query"). + Param(ws.PathParameter("namespace", "namespace specify").DataType("string").Required(true)). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("workloads", "workloads specify").DataType("string").Required(false)). + Param(ws.QueryParameter("workload_query", "workload query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("pods", "pods specify").DataType("string").Required(false)). + Param(ws.QueryParameter("pod_query", "pod query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("containers", "containers specify").DataType("string").Required(false)). + Param(ws.QueryParameter("container_query", "container query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}/workloads/{workload}").To(logging.LoggingQueryWorkload). + Filter(filter.Logging). + Doc("workload level log query"). + Param(ws.PathParameter("namespace", "namespace specify").DataType("string").Required(true)). + Param(ws.PathParameter("workload", "workload specify").DataType("string").Required(true)). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("pods", "pods specify").DataType("string").Required(false)). + Param(ws.QueryParameter("pod_query", "pod query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("containers", "containers specify").DataType("string").Required(false)). + Param(ws.QueryParameter("container_query", "container query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}").To(logging.LoggingQueryPod). + Filter(filter.Logging). + Doc("pod level log query"). + Param(ws.PathParameter("namespace", "namespace specify").DataType("string").Required(true)). + Param(ws.PathParameter("pod", "pod specify").DataType("string").Required(true)). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("containers", "containers specify").DataType("string").Required(false)). + Param(ws.QueryParameter("container_query", "container query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}/containers/{container}").To(logging.LoggingQueryContainer). + Filter(filter.Logging). + Doc("container level log query"). + Param(ws.PathParameter("namespace", "namespace specify").DataType("string").Required(true)). + Param(ws.PathParameter("pod", "pod specify").DataType("string").Required(true)). + Param(ws.PathParameter("container", "container specify").DataType("string").Required(true)). + Param(ws.QueryParameter("operation", "operation: query statistics").DataType("string").Required(true)). + Param(ws.QueryParameter("log_query", "log query keywords").DataType("string").Required(false)). + Param(ws.QueryParameter("interval", "interval of time histogram").DataType("string").Required(false)). + Param(ws.QueryParameter("start_time", "range start time").DataType("string").Required(false)). + Param(ws.QueryParameter("end_time", "range end time").DataType("string").Required(false)). + Param(ws.QueryParameter("sort", "sort method").DataType("string").Required(false)). + Param(ws.QueryParameter("from", "begin index of result returned").DataType("int").Required(true)). + Param(ws.QueryParameter("size", "size of result returned").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/fluentbit/filters").To(logging.LoggingQueryFluentbitFilters). + Filter(filter.Logging). + Doc("log fluent-bit filters query"). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.POST("/fluentbit/filters").To(logging.LoggingUpdateFluentbitFilters). + Filter(filter.Logging). + Doc("log fluent-bit filters update"). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/fluentbit/outputs").To(logging.LoggingQueryFluentbitOutputs). + Filter(filter.Logging). + Doc("log fluent-bit outputs query"). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.POST("/fluentbit/outputs").To(logging.LoggingInsertFluentbitOutput). + Filter(filter.Logging). + Doc("log fluent-bit outputs insert"). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.POST("/fluentbit/outputs/{output}").To(logging.LoggingUpdateFluentbitOutput). + Filter(filter.Logging). + Doc("log fluent-bit outputs update"). + Param(ws.PathParameter("output", "output id").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.DELETE("/fluentbit/outputs/{output}").To(logging.LoggingDeleteFluentbitOutput). + Filter(filter.Logging). + Doc("log fluent-bit outputs delete"). + Param(ws.PathParameter("output", "output id").DataType("int").Required(true)). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + c.Add(ws) + return nil +} diff --git a/pkg/apis/monitoring/v1alpha2/register.go b/pkg/apis/monitoring/v1alpha2/register.go index ae0ca055f81260eda4886b6fc0d0aadaa33f4c9d..d0bb1dfcf0a6e9cebc22c221f2e078818c5bee88 100644 --- a/pkg/apis/monitoring/v1alpha2/register.go +++ b/pkg/apis/monitoring/v1alpha2/register.go @@ -39,168 +39,196 @@ func addWebService(c *restful.Container) error { tags := []string{"Monitoring"} - ws.Route(ws.GET("/clusters").To(monitoring.MonitorCluster). + ws.Route(ws.GET("/cluster").To(monitoring.MonitorCluster). Doc("monitor cluster level metrics"). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("cluster_cpu_utilisation")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("cluster_cpu_utilisation")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes").To(monitoring.MonitorNode). Doc("monitor nodes level metrics"). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("node_cpu_utilisation")). - Param(ws.QueryParameter("nodes_filter", "node re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("node_cpu_utilisation")). + Param(ws.QueryParameter("resources_filter", "node re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}").To(monitoring.MonitorNode). Doc("monitor specific node level metrics"). - Param(ws.PathParameter("node", "specific node").Required(true).DefaultValue("")). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("node_cpu_utilisation")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("node", "specific node").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("node_cpu_utilisation")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces").To(monitoring.MonitorNamespace). Doc("monitor namespaces level metrics"). - Param(ws.QueryParameter("namespaces_filter", "namespaces re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("namespace_memory_utilisation")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.QueryParameter("resources_filter", "namespaces re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("namespace_memory_utilisation")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}").To(monitoring.MonitorNamespace). Doc("monitor specific namespace level metrics"). - Param(ws.PathParameter("namespace", "specific namespace").Required(true).DefaultValue("monitoring")). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("namespace_memory_utilisation")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "specific namespace").DataType("string").Required(true).DefaultValue("monitoring")). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("namespace_memory_utilisation")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/pods").To(monitoring.MonitorPod). Doc("monitor pods level metrics"). - Param(ws.PathParameter("namespace", "specific namespace").Required(true).DefaultValue("monitoring")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("pod_memory_utilisation_wo_cache")). - Param(ws.QueryParameter("pods_filter", "pod re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "specific namespace").DataType("string").Required(true).DefaultValue("monitoring")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("pod_memory_utilisation_wo_cache")). + Param(ws.QueryParameter("resources_filter", "pod re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}").To(monitoring.MonitorPod). Doc("monitor specific pod level metrics"). - Param(ws.PathParameter("namespace", "specific namespace").Required(true).DefaultValue("monitoring")). - Param(ws.PathParameter("pod", "specific pod").Required(true).DefaultValue("")). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "specific namespace").DataType("string").Required(true).DefaultValue("monitoring")). + Param(ws.PathParameter("pod", "specific pod").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}/pods").To(monitoring.MonitorPod). Doc("monitor pods level metrics by nodeid"). - Param(ws.PathParameter("node", "specific node").Required(true).DefaultValue("i-k89a62il")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("pod_memory_utilisation_wo_cache")). - Param(ws.QueryParameter("pods_filter", "pod re2 expression filter").Required(false).DefaultValue("openpitrix.*")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("node", "specific node").DataType("string").Required(true).DefaultValue("i-k89a62il")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("pod_memory_utilisation_wo_cache")). + Param(ws.QueryParameter("resources_filter", "pod re2 expression filter").DataType("string").Required(false).DefaultValue("openpitrix.*")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}/pods/{pod}").To(monitoring.MonitorPod). Doc("monitor specific pod level metrics by nodeid"). - Param(ws.PathParameter("node", "specific node").Required(true).DefaultValue("i-k89a62il")). - Param(ws.PathParameter("pod", "specific pod").Required(true).DefaultValue("")). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("node", "specific node").DataType("string").Required(true).DefaultValue("i-k89a62il")). + Param(ws.PathParameter("pod", "specific pod").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}/pods/{pod}/containers").To(monitoring.MonitorContainer). Doc("monitor specific pod level metrics by nodeid"). - Param(ws.PathParameter("node", "specific node").Required(true)). - Param(ws.PathParameter("pod", "specific pod").Required(true)). - Param(ws.QueryParameter("containers_filter", "container re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").Required(false)). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("node", "specific node").DataType("string").Required(true)). + Param(ws.PathParameter("pod", "specific pod").DataType("string").Required(true)). + Param(ws.QueryParameter("resources_filter", "container re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").DataType("string").Required(false)). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("pod_memory_utilisation_wo_cache")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}/containers").To(monitoring.MonitorContainer). Doc("monitor containers level metrics"). - Param(ws.PathParameter("namespace", "specific namespace").Required(true).DefaultValue("monitoring")). - Param(ws.PathParameter("pod", "specific pod").Required(true).DefaultValue("")). - Param(ws.QueryParameter("containers_filter", "container re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").Required(false)). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("container_memory_utilisation_wo_cache")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "specific namespace").DataType("string").Required(true).DefaultValue("monitoring")). + Param(ws.PathParameter("pod", "specific pod").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("resources_filter", "container re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").DataType("string").Required(false)). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("container_memory_utilisation_wo_cache")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}/containers/{container}").To(monitoring.MonitorContainer). Doc("monitor specific container level metrics"). - Param(ws.PathParameter("namespace", "specific namespace").Required(true).DefaultValue("monitoring")). - Param(ws.PathParameter("pod", "specific pod").Required(true).DefaultValue("")). - Param(ws.PathParameter("container", "specific container").Required(true).DefaultValue("")). - Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").Required(true).DefaultValue("container_memory_utilisation_wo_cache")). - Metadata(restfulspec.KeyOpenAPITags, tags)) - - ws.Route(ws.GET("/namespaces/{namespace}/workloads/{workload_kind}").To(monitoring.MonitorWorkload). + Param(ws.PathParameter("namespace", "specific namespace").DataType("string").Required(true).DefaultValue("monitoring")). + Param(ws.PathParameter("pod", "specific pod").DataType("string").Required(true).DefaultValue("")). + Param(ws.PathParameter("container", "specific container").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("metrics_name", "metrics name cpu memory...").DataType("string").Required(true).DefaultValue("container_memory_utilisation_wo_cache")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}/workloads/{workload_kind}/{workload}").To(monitoring.MonitorWorkload). Doc("monitor specific workload level metrics"). - Param(ws.PathParameter("namespace", "namespace").Required(true).DefaultValue("kube-system")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").Required(false)). - Param(ws.PathParameter("workload_kind", "workload kind").Required(false).DefaultValue("daemonset")). - Param(ws.QueryParameter("workload_name", "workload name").Required(true).DefaultValue("")). - Param(ws.QueryParameter("pods_filter", "pod re2 expression filter").Required(false).DefaultValue("openpitrix.*")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "max metric items in a page").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "namespace").DataType("string").Required(true).DefaultValue("kube-system")). + Param(ws.PathParameter("workload_kind", "workload kind").DataType("string").Required(true).DefaultValue("daemonset")). + Param(ws.PathParameter("workload", "workload name").DataType("string").Required(true).DefaultValue("")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").DataType("string").Required(false)). + Param(ws.QueryParameter("resources_filter", "pod re2 expression filter").DataType("string").Required(false).DefaultValue("openpitrix.*")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "max metric items in a page").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/workloads").To(monitoring.MonitorWorkload). Doc("monitor all workload level metrics"). - Param(ws.PathParameter("namespace", "namespace").Required(true).DefaultValue("kube-system")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").Required(false)). - Param(ws.QueryParameter("workloads_filter", "pod re2 expression filter").Required(false).DefaultValue("")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("namespace", "namespace").DataType("string").Required(true).DefaultValue("kube-system")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").DataType("string").Required(false)). + Param(ws.QueryParameter("resources_filter", "pod re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) // list all namespace in this workspace by selected metrics ws.Route(ws.GET("/workspaces/{workspace}").To(monitoring.MonitorOneWorkspace). Doc("monitor workspaces level metrics"). - Param(ws.PathParameter("workspace", "workspace name").Required(true)). - Param(ws.QueryParameter("namespaces_filter", "namespaces filter").Required(false).DefaultValue("k.*")). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("namespace_memory_utilisation_wo_cache")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.PathParameter("workspace", "workspace name").DataType("string").Required(true)). + Param(ws.QueryParameter("resources_filter", "namespaces filter").DataType("string").Required(false).DefaultValue("k.*")). + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("namespace_memory_utilisation_wo_cache")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) ws.Route(ws.GET("/workspaces").To(monitoring.MonitorAllWorkspaces). Doc("monitor workspaces level metrics"). - Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("workspace_memory_utilisation")). - Param(ws.QueryParameter("workspaces_filter", "workspaces re2 expression filter").Required(false).DefaultValue(".*")). - Param(ws.QueryParameter("sort_metric", "sort metric").Required(false)). - Param(ws.QueryParameter("sort_type", "ascending descending order").Required(false)). - Param(ws.QueryParameter("page", "page number").Required(false).DefaultValue("1")). - Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "rank, statistic").Required(false).DefaultValue("rank")). - Metadata(restfulspec.KeyOpenAPITags, tags)) - - ws.Route(ws.GET("/components").To(monitoring.MonitorComponentStatus). - Doc("monitor k8s components status"). - Metadata(restfulspec.KeyOpenAPITags, tags)) + Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("workspace_memory_utilisation")). + Param(ws.QueryParameter("resources_filter", "workspaces re2 expression filter").DataType("string").Required(false).DefaultValue(".*")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). + Metadata(restfulspec.KeyOpenAPITags, tags)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) c.Add(ws) return nil diff --git a/pkg/apiserver/logging/logging.go b/pkg/apiserver/logging/logging.go new file mode 100644 index 0000000000000000000000000000000000000000..68ec7394861d76a313e910be1820b3be8493ffc6 --- /dev/null +++ b/pkg/apiserver/logging/logging.go @@ -0,0 +1,224 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +package logging + +import ( + "github.com/emicklei/go-restful" + "github.com/golang/glog" + "kubesphere.io/kubesphere/pkg/models/log" + es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" + fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" + "net/http" + "strconv" +) + +func LoggingQueryCluster(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelCluster, request) + response.WriteAsJson(res) +} + +func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelWorkspace, request) + response.WriteAsJson(res) +} + +func LoggingQueryNamespace(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelNamespace, request) + response.WriteAsJson(res) +} + +func LoggingQueryWorkload(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelWorkload, request) + response.WriteAsJson(res) +} + +func LoggingQueryPod(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelPod, request) + response.WriteAsJson(res) +} + +func LoggingQueryContainer(request *restful.Request, response *restful.Response) { + res := logQuery(log.QueryLevelContainer, request) + response.WriteAsJson(res) +} + +func LoggingQueryFluentbitFilters(request *restful.Request, response *restful.Response) { + res := log.FluentbitFiltersQuery() + response.WriteAsJson(res) +} + +func LoggingUpdateFluentbitFilters(request *restful.Request, response *restful.Response) { + + var res *log.FluentbitFiltersResult + + filters := new([]log.FluentbitFilter) + + err := request.ReadEntity(&filters) + if err != nil { + res = &log.FluentbitFiltersResult{Status: http.StatusBadRequest} + } else { + res = log.FluentbitFiltersUpdate(filters) + } + + response.WriteAsJson(res) +} + +func LoggingQueryFluentbitOutputs(request *restful.Request, response *restful.Response) { + res := log.FluentbitOutputsQuery() + response.WriteAsJson(res) +} + +func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Response) { + + var output fb.OutputPlugin + var res *log.FluentbitOutputsResult + + err := request.ReadEntity(&output) + if err != nil { + res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest} + } else { + res = log.FluentbitOutputInsert(output) + } + + response.WriteAsJson(res) +} + +func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Response) { + + var output fb.OutputPlugin + + id := request.PathParameter("output") + _, err := strconv.ParseUint(id, 10, 64) + if err != nil { + res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest} + response.WriteAsJson(res) + return + } + err = request.ReadEntity(&output) + if err != nil { + res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest} + response.WriteAsJson(res) + return + } + + res := log.FluentbitOutputUpdate(output, id) + response.WriteAsJson(res) +} + +func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Response) { + + var res *log.FluentbitOutputsResult + + id := request.PathParameter("output") + _, err := strconv.ParseUint(id, 10, 64) + if err != nil { + res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest} + } else { + res = log.FluentbitOutputDelete(id) + } + + response.WriteAsJson(res) +} + +func logQuery(level log.LogQueryLevel, request *restful.Request) *es.QueryResult { + var param es.QueryParameters + + param.Operation = request.QueryParameter("operation") + + switch level { + case log.QueryLevelCluster: + { + param.NamespaceFilled, param.Namespaces = log.QueryWorkspace(request.QueryParameter("workspaces"), request.QueryParameter("workspace_query")) + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.QueryParameter("namespaces"), param.NamespaceFilled, param.Namespaces) + param.NamespaceQuery = request.QueryParameter("namespace_query") + param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) + param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) + param.PodQuery = request.QueryParameter("pod_query") + param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) + param.ContainerQuery = request.QueryParameter("container_query") + } + case log.QueryLevelWorkspace: + { + param.NamespaceFilled, param.Namespaces = log.QueryWorkspace(request.PathParameter("workspace"), "") + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.QueryParameter("namespaces"), param.NamespaceFilled, param.Namespaces) + param.NamespaceQuery = request.QueryParameter("namespace_query") + param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) + param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) + param.PodQuery = request.QueryParameter("pod_query") + param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) + param.ContainerQuery = request.QueryParameter("container_query") + } + case log.QueryLevelNamespace: + { + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) + param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) + param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) + param.PodQuery = request.QueryParameter("pod_query") + param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) + param.ContainerQuery = request.QueryParameter("container_query") + } + case log.QueryLevelWorkload: + { + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) + param.PodFilled, param.Pods = log.QueryWorkload(request.PathParameter("workload"), "", param.Namespaces) + param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) + param.PodQuery = request.QueryParameter("pod_query") + param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) + param.ContainerQuery = request.QueryParameter("container_query") + } + case log.QueryLevelPod: + { + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) + param.PodFilled, param.Pods = log.MatchPod(request.PathParameter("pod"), false, nil) + param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) + param.ContainerQuery = request.QueryParameter("container_query") + } + case log.QueryLevelContainer: + { + param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) + param.PodFilled, param.Pods = log.MatchPod(request.PathParameter("pod"), false, nil) + param.ContainerFilled, param.Containers = log.MatchContainer(request.PathParameter("container")) + } + } + + if len(param.Namespaces) == 1 { + param.Workspace = log.GetWorkspaceOfNamesapce(param.Namespaces[0]) + } + + param.Interval = request.QueryParameter("interval") + + param.LogQuery = request.QueryParameter("log_query") + param.StartTime = request.QueryParameter("start_time") + param.EndTime = request.QueryParameter("end_time") + param.Sort = request.QueryParameter("sort") + + var err error + param.From, err = strconv.ParseInt(request.QueryParameter("from"), 10, 64) + if err != nil { + param.From = 0 + } + param.Size, err = strconv.ParseInt(request.QueryParameter("size"), 10, 64) + if err != nil { + param.Size = 10 + } + + glog.Infof("LogQuery with %v", param) + + return es.Query(param) +} diff --git a/pkg/apiserver/monitoring/monitoring.go b/pkg/apiserver/monitoring/monitoring.go index 39c8e3c495b1bded3c51c0853a2b27e11ef0e13c..2d5d2e8f5af94075d89f03613a97bf12beeab994 100644 --- a/pkg/apiserver/monitoring/monitoring.go +++ b/pkg/apiserver/monitoring/monitoring.go @@ -20,11 +20,11 @@ package monitoring import ( "github.com/emicklei/go-restful" "kubesphere.io/kubesphere/pkg/models/metrics" - "kubesphere.io/kubesphere/pkg/simple/client/prometheus" + prom "kubesphere.io/kubesphere/pkg/simple/client/prometheus" ) func MonitorPod(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) podName := requestParams.PodName metricName := requestParams.MetricsName if podName != "" { @@ -32,7 +32,8 @@ func MonitorPod(request *restful.Request, response *restful.Response) { queryType, params, nullRule := metrics.AssemblePodMetricRequestInfo(requestParams, metricName) var res *metrics.FormatedMetric if !nullRule { - res = metrics.GetMetric(queryType, params, metricName) + metricsStr := prom.SendMonitoringRequest(queryType, params) + res = metrics.ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) } response.WriteAsJson(res) @@ -40,21 +41,20 @@ func MonitorPod(request *restful.Request, response *restful.Response) { // multiple rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelPod) // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelPodName) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) - response.WriteAsJson(pagedMetrics) } } func MonitorContainer(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) metricName := requestParams.MetricsName if requestParams.MetricsFilter != "" { rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelContainer) // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelContainerName) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) @@ -68,7 +68,7 @@ func MonitorContainer(request *restful.Request, response *restful.Response) { } func MonitorWorkload(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkload) @@ -80,10 +80,10 @@ func MonitorWorkload(request *restful.Request, response *restful.Response) { // sorting if wlKind == "" { - sortedMetrics, maxMetricCount = metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkload) + sortedMetrics, maxMetricCount = metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) } else { - sortedMetrics, maxMetricCount = metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelPodName) + sortedMetrics, maxMetricCount = metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) } // paging @@ -95,19 +95,18 @@ func MonitorWorkload(request *restful.Request, response *restful.Response) { func MonitorAllWorkspaces(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) tp := requestParams.Tp - if tp == "_statistics" { + if tp == "statistics" { // merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin res := metrics.MonitorAllWorkspacesStatistics() - response.WriteAsJson(res) } else if tp == "rank" { rawMetrics := metrics.MonitorAllWorkspaces(requestParams) // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkspace) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) @@ -119,20 +118,19 @@ func MonitorAllWorkspaces(request *restful.Request, response *restful.Response) } func MonitorOneWorkspace(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) tp := requestParams.Tp if tp == "rank" { // multiple rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelNamespace) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) - response.WriteAsJson(pagedMetrics) - } else if tp == "_statistics" { + } else if tp == "statistics" { wsName := requestParams.WsName // merge multiple metric: devops, roles, projects... @@ -145,34 +143,35 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) { } func MonitorNamespace(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) metricName := requestParams.MetricsName nsName := requestParams.NsName if nsName != "" { // single queryType, params := metrics.AssembleNamespaceMetricRequestInfo(requestParams, metricName) - res := metrics.GetMetric(queryType, params, metricName) + metricsStr := prom.SendMonitoringRequest(queryType, params) + res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""}) response.WriteAsJson(res) } else { // multiple rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelNamespace) // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelNamespace) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) - response.WriteAsJson(pagedMetrics) } } func MonitorCluster(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) metricName := requestParams.MetricsName if metricName != "" { // single queryType, params := metrics.AssembleClusterMetricRequestInfo(requestParams, metricName) - res := metrics.GetMetric(queryType, params, metricName) + metricsStr := prom.SendMonitoringRequest(queryType, params) + res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"}) response.WriteAsJson(res) } else { @@ -183,15 +182,17 @@ func MonitorCluster(request *restful.Request, response *restful.Response) { } func MonitorNode(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) + requestParams := prom.ParseMonitoringRequestParams(request) metricName := requestParams.MetricsName if metricName != "" { // single queryType, params := metrics.AssembleNodeMetricRequestInfo(requestParams, metricName) - res := metrics.GetMetric(queryType, params, metricName) + metricsStr := prom.SendMonitoringRequest(queryType, params) + res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) nodeAddress := metrics.GetNodeAddressInfo() metrics.AddNodeAddressMetric(res, nodeAddress) + response.WriteAsJson(res) } else { // multiple @@ -203,18 +204,10 @@ func MonitorNode(request *restful.Request, response *restful.Response) { } // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelNode) + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) response.WriteAsJson(pagedMetrics) } } - -// k8s component(controller, scheduler, etcd) status -func MonitorComponentStatus(request *restful.Request, response *restful.Response) { - requestParams := prometheus.ParseMonitoringRequestParams(request) - - status := metrics.MonitorComponentStatus(requestParams) - response.WriteAsJson(status) -} diff --git a/pkg/db/schema/V0__Logging_output_configs.sql b/pkg/db/schema/V0__Logging_output_configs.sql new file mode 100644 index 0000000000000000000000000000000000000000..099f5f730509a8649d5ea870888e4a6e49ac21fe --- /dev/null +++ b/pkg/db/schema/V0__Logging_output_configs.sql @@ -0,0 +1,14 @@ +CREATE TABLE output_db_bindings + ( + id INT NOT NULL AUTO_INCREMENT, + type TEXT NOT NULL, + name TEXT NOT NULL, + parameters TEXT NOT NULL, + internal BOOLEAN, + enable BOOLEAN NOT NULL, + updatetime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (id) + ); + +INSERT INTO output_db_bindings (type, name, parameters, enable) VALUES ('fluentbit_output', 'fluentbit-output', '[{"name": "Name", "value": "es"}, {"name": "Match", "value": "kube.*"}, {"name": "Host", "value": "elasticsearch-logging-data.kubesphere-logging-system.svc"}, {"name": "Port", "value": "9200"}, {"name": "Logstash_Format", "value": "On"}, {"name": "Replace_Dots", "value": "on"}, {"name": "Retry_Limit", "value": "False"}, {"name": "Type", "value": "flb_type"}, {"name": "Time_Key", "value": "@timestamp"}, {"name": "Logstash_Prefix", "value": "logstash"} ]', '1'); \ No newline at end of file diff --git a/pkg/models/log/constants.go b/pkg/models/log/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..a184b6f64615f0551ee310e0e6bfb13aed7f8b34 --- /dev/null +++ b/pkg/models/log/constants.go @@ -0,0 +1,30 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +package log + +type LogQueryLevel int + +const ( + QueryLevelCluster LogQueryLevel = iota + QueryLevelWorkspace + QueryLevelNamespace + QueryLevelWorkload + QueryLevelPod + QueryLevelContainer +) \ No newline at end of file diff --git a/pkg/models/log/logcollector.go b/pkg/models/log/logcollector.go new file mode 100644 index 0000000000000000000000000000000000000000..0afe56c548c4db434926c312480f69aedd324e71 --- /dev/null +++ b/pkg/models/log/logcollector.go @@ -0,0 +1,309 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +package log + +import ( + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/labels" + "kubesphere.io/kubesphere/pkg/constants" + "kubesphere.io/kubesphere/pkg/informers" + "reflect" + "strings" +) + +func intersection(s1, s2 []string) (inter []string) { + hash := make(map[string]bool) + for _, e := range s1 { + hash[e] = true + } + for _, e := range s2 { + // If elements present in the hashmap then append intersection list. + if hash[e] { + inter = append(inter, e) + } + } + //Remove dups from slice. + inter = removeDups(inter) + return +} + +//Remove dups from slice. +func removeDups(elements []string) (nodups []string) { + encountered := make(map[string]bool) + for _, element := range elements { + if !encountered[element] { + nodups = append(nodups, element) + encountered[element] = true + } + } + return +} + +func in(value interface{}, container interface{}) int { + if container == nil { + return -1 + } + containerValue := reflect.ValueOf(container) + switch reflect.TypeOf(container).Kind() { + case reflect.Slice, reflect.Array: + for i := 0; i < containerValue.Len(); i++ { + if containerValue.Index(i).Interface() == value { + return i + } + } + case reflect.Map: + if containerValue.MapIndex(reflect.ValueOf(value)).IsValid() { + return -1 + } + default: + return -1 + } + return -1 +} + +func getWorkloadName(name string, kind string) string { + if kind == "ReplicaSet" { + lastIndex := strings.LastIndex(name, "-") + if lastIndex >= 0 { + return name[:lastIndex] + } + } + + return name +} + +func matchLabel(label string, labelsMatch []string) bool { + var result = false + + for _, labelMatch := range labelsMatch { + if strings.Compare(label, labelMatch) == 0 { + result = true + break + } + } + + return result +} + +func queryLabel(label string, labelsQuery []string) bool { + var result = false + + for _, labelQuery := range labelsQuery { + if strings.Contains(label, labelQuery) { + result = true + break + } + } + + return result +} + +func QueryWorkspace(workspaceMatch string, workspaceQuery string) (bool, []string) { + if workspaceMatch == "" && workspaceQuery == "" { + return false, nil + } + + nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() + nsList, err := nsLister.List(labels.Everything()) + if err != nil { + glog.Error("failed to list namespace, error: ", err) + return true, nil + } + + var namespaces []string + + var hasMatch = false + var workspacesMatch []string + if workspaceMatch != "" { + workspacesMatch = strings.Split(strings.Replace(workspaceMatch, ",", " ", -1), " ") + hasMatch = true + } + + var hasQuery = false + var workspacesQuery []string + if workspaceQuery != "" { + workspacesQuery = strings.Split(strings.ToLower(strings.Replace(workspaceQuery, ",", " ", -1)), " ") + hasQuery = true + } + + for _, ns := range nsList { + labels := ns.GetLabels() + _, ok := labels[constants.WorkspaceLabelKey] + if ok { + var namespaceCanAppend = true + if hasMatch { + if !matchLabel(labels[constants.WorkspaceLabelKey], workspacesMatch) { + namespaceCanAppend = false + } + } + if hasQuery { + if !queryLabel(strings.ToLower(labels[constants.WorkspaceLabelKey]), workspacesQuery) { + namespaceCanAppend = false + } + } + + if namespaceCanAppend { + namespaces = append(namespaces, ns.GetName()) + } + } + } + + return true, namespaces +} + +func MatchNamespace(namespaceMatch string, namespaceFilled bool, namespaces []string) (bool, []string) { + if namespaceMatch == "" { + return namespaceFilled, namespaces + } + + namespacesMatch := strings.Split(strings.Replace(namespaceMatch, ",", " ", -1), " ") + + if namespaceFilled { + return true, intersection(namespacesMatch, namespaces) + } + + return true, namespacesMatch +} + +func QueryWorkload(workloadMatch string, workloadQuery string, namespaces []string) (bool, []string) { + if workloadMatch == "" && workloadQuery == "" { + return false, nil + } + + podLister := informers.SharedInformerFactory().Core().V1().Pods().Lister() + podList, err := podLister.List(labels.Everything()) + if err != nil { + glog.Error("failed to list pods, error: ", err) + return true, nil + } + + var pods []string + + var hasMatch = false + var workloadsMatch []string + if workloadMatch != "" { + workloadsMatch = strings.Split(strings.Replace(workloadMatch, ",", " ", -1), " ") + hasMatch = true + } + + var hasQuery = false + var workloadsQuery []string + if workloadQuery != "" { + workloadsQuery = strings.Split(strings.ToLower(strings.Replace(workloadQuery, ",", " ", -1)), " ") + hasQuery = true + } + + if namespaces == nil { + for _, pod := range podList { + /*if len(pod.ObjectMeta.OwnerReferences) > 0 { + glog.Infof("List Pod %v:%v:%v", pod.Name, pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) + }*/ + if len(pod.ObjectMeta.OwnerReferences) > 0 { + var podCanAppend = true + workloadName := getWorkloadName(pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) + if hasMatch { + if !matchLabel(workloadName, workloadsMatch) { + podCanAppend = false + } + } + if hasQuery { + if !queryLabel(strings.ToLower(workloadName), workloadsQuery) { + podCanAppend = false + } + } + + if podCanAppend { + pods = append(pods, pod.Name) + } + } + } + } else { + for _, pod := range podList { + /*if len(pod.ObjectMeta.OwnerReferences) > 0 { + glog.Infof("List Pod %v:%v:%v", pod.Name, pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) + }*/ + if len(pod.ObjectMeta.OwnerReferences) > 0 && in(pod.Namespace, namespaces) >= 0 { + var podCanAppend = true + workloadName := getWorkloadName(pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) + if hasMatch { + if !matchLabel(workloadName, workloadsMatch) { + podCanAppend = false + } + } + if hasQuery { + if !queryLabel(strings.ToLower(workloadName), workloadsQuery) { + podCanAppend = false + } + } + + if podCanAppend { + pods = append(pods, pod.Name) + } + } + } + } + + return true, pods +} + +func MatchPod(podMatch string, podFilled bool, pods []string) (bool, []string) { + if podMatch == "" { + return podFilled, pods + } + + podsMatch := strings.Split(strings.Replace(podMatch, ",", " ", -1), " ") + + if podFilled { + return true, intersection(podsMatch, pods) + } + + return true, podsMatch +} + +func MatchContainer(containerMatch string) (bool, []string) { + if containerMatch == "" { + return false, nil + } + + return true, strings.Split(strings.Replace(containerMatch, ",", " ", -1), " ") +} + +func GetWorkspaceOfNamesapce(namespace string) string { + var workspace string + workspace = "" + + nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() + nsList, err := nsLister.List(labels.Everything()) + if err != nil { + glog.Error("failed to list namespace, error: ", err) + return workspace + } + + for _, ns := range nsList { + if ns.GetName() == namespace { + labels := ns.GetLabels() + _, ok := labels[constants.WorkspaceLabelKey] + if ok { + workspace = labels[constants.WorkspaceLabelKey] + } + } + } + + return workspace +} \ No newline at end of file diff --git a/pkg/models/log/logcrd.go b/pkg/models/log/logcrd.go new file mode 100644 index 0000000000000000000000000000000000000000..923077989f1a73074dd9ea9a5a6ecd61680d8b3d --- /dev/null +++ b/pkg/models/log/logcrd.go @@ -0,0 +1,472 @@ +/* +Copyright 2018 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package log + +import ( + "github.com/jinzhu/gorm" + "github.com/json-iterator/go" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" + fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" + "kubesphere.io/kubesphere/pkg/simple/client/mysql" + "net/http" + "strings" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary + +func createCRDClientSet() (*rest.RESTClient, *runtime.Scheme, error) { + config, err := fb.GetClientConfig("") + if err != nil { + //panic(err.Error()) + return nil, nil, err + } + + // Create a new clientset which include our CRD schema + return fb.NewFluentbitCRDClient(config) +} + +func getParameterValue(parameters []fb.Parameter, name string) string { + var value string + + value = "" + for _, parameter := range parameters { + if parameter.Name == name { + value = parameter.Value + } + } + + return value +} + +func getFilters(result *FluentbitFiltersResult, Filters []fb.Plugin) { + for _, filter := range Filters { + if strings.Compare(filter.Name, "fluentbit-filter-input-regex") == 0 { + parameters := strings.Split(getParameterValue(filter.Parameters, "Regex"), " ") + field := strings.TrimSuffix(strings.TrimPrefix(parameters[0], "kubernetes_"), "_name") + expression := parameters[1] + result.Filters = append(result.Filters, FluentbitFilter{"Regex", field, expression}) + } + if strings.Compare(filter.Name, "fluentbit-filter-input-exclude") == 0 { + parameters := strings.Split(getParameterValue(filter.Parameters, "Exclude"), " ") + field := strings.TrimSuffix(strings.TrimPrefix(parameters[0], "kubernetes_"), "_name") + expression := parameters[1] + result.Filters = append(result.Filters, FluentbitFilter{"Exclude", field, expression}) + } + } +} + +func FluentbitFiltersQuery() *FluentbitFiltersResult { + var result FluentbitFiltersResult + + crdcs, scheme, err := createCRDClientSet() + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // Create a CRD client interface + crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + + item, err := crdclient.Get("fluent-bit") + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + getFilters(&result, item.Spec.Filter) + + result.Status = http.StatusOK + + return &result +} + +func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult { + var result FluentbitFiltersResult + + //Generate filter plugin config + var filter []fb.Plugin + + var para_kubernetes []fb.Parameter + para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Name", Value: "kubernetes"}) + para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_URL", Value: "https://kubernetes.default.svc:443"}) + para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_CA_File", Value: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"}) + para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_Token_File", Value: "/var/run/secrets/kubernetes.io/serviceaccount/token"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-kubernetes", Parameters: para_kubernetes}) + + var para_lift []fb.Parameter + para_lift = append(para_lift, fb.Parameter{Name: "Name", Value: "nest"}) + para_lift = append(para_lift, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_lift = append(para_lift, fb.Parameter{Name: "Operation", Value: "lift"}) + para_lift = append(para_lift, fb.Parameter{Name: "Nested_under", Value: "kubernetes"}) + para_lift = append(para_lift, fb.Parameter{Name: "Prefix_with", Value: "kubernetes_"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-lift", Parameters: para_lift}) + + var para_remove_stream []fb.Parameter + para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Name", Value: "modify"}) + para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Remove", Value: "stream"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-stream", Parameters: para_remove_stream}) + + var para_remove_labels []fb.Parameter + para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Name", Value: "modify"}) + para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Remove", Value: "kubernetes_labels"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-labels", Parameters: para_remove_labels}) + + var para_remove_annotations []fb.Parameter + para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Name", Value: "modify"}) + para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Remove", Value: "kubernetes_annotations"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-annotations", Parameters: para_remove_annotations}) + + var para_remove_pod_id []fb.Parameter + para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Name", Value: "modify"}) + para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Remove", Value: "kubernetes_pod_id"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-podid", Parameters: para_remove_pod_id}) + + var para_remove_docker_id []fb.Parameter + para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Name", Value: "modify"}) + para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Remove", Value: "kubernetes_docker_id"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-dockerid", Parameters: para_remove_docker_id}) + + if len(*filters) > 0 { + for _, item := range *filters { + if strings.Compare(item.Type, "Regex") == 0 { + field := "kubernetes_" + strings.TrimSpace(item.Field) + "_name" + expression := strings.TrimSpace(item.Expression) + + var para_regex []fb.Parameter + para_regex = append(para_regex, fb.Parameter{Name: "Name", Value: "grep"}) + para_regex = append(para_regex, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_regex = append(para_regex, fb.Parameter{Name: "Regex", Value: field + " " + expression}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-regex", Parameters: para_regex}) + } + + if strings.Compare(item.Type, "Exclude") == 0 { + field := "kubernetes_" + strings.TrimSpace(item.Field) + "_name" + expression := strings.TrimSpace(item.Expression) + + var para_exclude []fb.Parameter + para_exclude = append(para_exclude, fb.Parameter{Name: "Name", Value: "grep"}) + para_exclude = append(para_exclude, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_exclude = append(para_exclude, fb.Parameter{Name: "Exclude", Value: field + " " + expression}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-exclude", Parameters: para_exclude}) + } + } + } + + var para_nest []fb.Parameter + para_nest = append(para_nest, fb.Parameter{Name: "Name", Value: "nest"}) + para_nest = append(para_nest, fb.Parameter{Name: "Match", Value: "kube.*"}) + para_nest = append(para_nest, fb.Parameter{Name: "Operation", Value: "nest"}) + para_nest = append(para_nest, fb.Parameter{Name: "Wildcard", Value: "kubernetes_*"}) + para_nest = append(para_nest, fb.Parameter{Name: "Nested_under", Value: "kubernetes"}) + para_nest = append(para_nest, fb.Parameter{Name: "Remove_prefix", Value: "kubernetes_"}) + filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-nest", Parameters: para_nest}) + + crdcs, scheme, err := createCRDClientSet() + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // Create a CRD client interface + crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + + var item *fb.FluentBit + var err_read error + + item, err_read = crdclient.Get("fluent-bit") + if err_read != nil { + result.Status = http.StatusInternalServerError + return &result + } + + item.Spec.Filter = filter + + itemnew, err := crdclient.Update("fluent-bit", item) + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + getFilters(&result, itemnew.Spec.Filter) + result.Status = http.StatusOK + + return &result +} + +func FluentbitOutputsQuery() *FluentbitOutputsResult { + var result FluentbitOutputsResult + + // Retrieve outputs from DB + db := mysql.Client() + + var outputs []OutputDBBinding + + err := db.Find(&outputs).Error + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + var unmarshaledOutputs []fb.OutputPlugin + + for _, output := range outputs { + var params []fb.Parameter + + err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms) + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + unmarshaledOutputs = append(unmarshaledOutputs, + fb.OutputPlugin{Plugin: fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params}, + Id: output.Id, Enable: output.Enable, Updatetime: output.Updatetime}) + } + + result.Outputs = unmarshaledOutputs + result.Status = http.StatusOK + + return &result +} + +func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { + var result FluentbitOutputsResult + + params, err := jsoniter.MarshalToString(output.Parameters) + if err != nil { + result.Status = http.StatusBadRequest + return &result + } + + // 1. Update DB + db := mysql.Client() + + marshaledOutput := OutputDBBinding{Type: output.Type, Name: output.Name, + Parameters: params, Enable: output.Enable, Updatetime: time.Now()} + err = db.Create(&marshaledOutput).Error + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // 2. Keep CRD in inline with DB + err = syncFluentbitCRDOutputWithDB(db) + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // 3. If it's an configs output added, reset configs client configs + configs := ParseEsOutputParams(output.Parameters) + if configs != nil { + configs.WriteESConfigs() + } + + result.Status = http.StatusOK + return &result +} + +func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsResult { + var result FluentbitOutputsResult + + // 1. Update DB + db := mysql.Client() + + params, err := jsoniter.MarshalToString(output.Parameters) + if err != nil { + result.Status = http.StatusBadRequest + return &result + } + + var marshaledOutput OutputDBBinding + err = db.Where("id = ?", id).First(&marshaledOutput).Error + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + marshaledOutput.Name = output.Name + marshaledOutput.Type = output.Type + marshaledOutput.Parameters = params + marshaledOutput.Enable = output.Enable + + err = db.Save(&marshaledOutput).Error + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // 2. Keep CRD in inline with DB + err = syncFluentbitCRDOutputWithDB(db) + if err != nil { + result.Status = http.StatusBadRequest + return &result + } + + // 3. If it's an configs output updated, reset configs client configs + configs := ParseEsOutputParams(output.Parameters) + if configs != nil { + configs.WriteESConfigs() + } + + result.Status = http.StatusOK + return &result +} + +func FluentbitOutputDelete(id string) *FluentbitOutputsResult { + var result FluentbitOutputsResult + + // 1. Remove the record from DB + db := mysql.Client() + + err := db.Where("id = ?", id).Delete(&OutputDBBinding{}).Error + if err != nil { + result.Status = http.StatusInternalServerError + return &result + } + + // 2. Keep CRD in inline with DB + err = syncFluentbitCRDOutputWithDB(db) + if err != nil { + result.Status = http.StatusBadRequest + return &result + } + + result.Status = http.StatusOK + return &result +} + +func syncFluentbitCRDOutputWithDB(db *gorm.DB) error { + var outputs []OutputDBBinding + + err := db.Where("enable is true").Find(&outputs).Error + if err != nil { + return err + } + + var unmarshaledOutputs []fb.Plugin + + for _, output := range outputs { + var params []fb.Parameter + + err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms) + if err != nil { + return err + } + + unmarshaledOutputs = append(unmarshaledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params}) + } + // Empty output is not allowed, must specify a null-type output + if len(unmarshaledOutputs) == 0 { + unmarshaledOutputs = []fb.Plugin{ + { + Type: "fluentbit_output", + Name: "fluentbit-output-null", + Parameters: []fb.Parameter{ + { + Name: "Name", + Value: "null", + }, + { + Name: "Match", + Value: "*", + }, + }, + }, + } + } + + crdcs, scheme, err := createCRDClientSet() + if err != nil { + return err + } + + // Create a CRD client interface + crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + + fluentbit, err := crdclient.Get("fluent-bit") + if err != nil { + return err + } + + fluentbit.Spec.Output = unmarshaledOutputs + _, err = crdclient.Update("fluent-bit", fluentbit) + if err != nil { + return err + } + + return nil +} + +// Parse es host, port and index +func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs { + + var ( + isEsFound bool + + host = "127.0.0.1" + port = "9200" + index = "logstash" + logstashFormat string + logstashPrefix string + ) + + for _, param := range params { + switch param.Name { + case "Name": + if param.Value == "es" { + isEsFound = true + } + case "Host": + host = param.Value + case "Port": + port = param.Value + case "Index": + index = param.Value + case "Logstash_Format": + logstashFormat = strings.ToLower(param.Value) + case "Logstash_Prefix": + logstashPrefix = param.Value + } + } + + if !isEsFound { + return nil + } + + // If Logstash_Format is On/True, ignore Index + if logstashFormat == "on" || logstashFormat == "true" { + if logstashPrefix != "" { + index = logstashPrefix + } else { + index = "logstash" + } + } + + return &es.ESConfigs{Host: host, Port: port, Index: index} +} diff --git a/pkg/models/log/types.go b/pkg/models/log/types.go new file mode 100644 index 0000000000000000000000000000000000000000..ca791106799d6193ce0cf2ef2da4827f4ed96e67 --- /dev/null +++ b/pkg/models/log/types.go @@ -0,0 +1,64 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +package log + +import ( + fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" + "time" +) + +type FluentbitCRDResult struct { + Status int `json:"status"` + CRD fb.FluentBitSpec `json:"CRD,omitempty"` +} + +type FluentbitCRDDeleteResult struct { + Status int `json:"status"` +} + +type FluentbitSettingsResult struct { + Status int `json:"status"` + Enable string `json:"Enable,omitempty"` +} + +type FluentbitFilter struct { + Type string `json:"type"` + Field string `json:"field"` + Expression string `json:"expression"` +} + +type FluentbitFiltersResult struct { + Status int `json:"status"` + Filters []FluentbitFilter `json:"filters,omitempty"` +} + +type FluentbitOutputsResult struct { + Status int `json:"status"` + Outputs []fb.OutputPlugin `json:"outputs,omitempty"` +} + +type OutputDBBinding struct { + Id uint `gorm:"primary_key;auto_increment;unique"` + Type string `gorm:"not null"` + Name string `gorm:"not null"` + Parameters string `gorm:"not null"` + Internal bool + Enable bool `gorm:"not null"` + Updatetime time.Time `gorm:"not null"` +} \ No newline at end of file diff --git a/pkg/models/metrics/metrics.go b/pkg/models/metrics/metrics.go index 17810bc01e69b6a88c76b2caf1922677037b6b55..8b8efa0f69b3920e2e7ab83c001e6b1306d24545 100644 --- a/pkg/models/metrics/metrics.go +++ b/pkg/models/metrics/metrics.go @@ -1,42 +1,32 @@ /* - Copyright 2019 The KubeSphere Authors. +Copyright 2019 The KubeSphere Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package metrics import ( - "fmt" - "kubesphere.io/kubesphere/pkg/models" - "kubesphere.io/kubesphere/pkg/simple/client/k8s" - "kubesphere.io/kubesphere/pkg/simple/client/prometheus" + "kubesphere.io/kubesphere/pkg/informers" "net/url" "regexp" "strings" "sync" "time" - "kubesphere.io/kubesphere/pkg/constants" - "kubesphere.io/kubesphere/pkg/informers" - "kubesphere.io/kubesphere/pkg/models/components" - "kubesphere.io/kubesphere/pkg/models/workspaces" - "github.com/golang/glog" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "runtime/debug" "sort" @@ -44,13 +34,13 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" -) -var ( - jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary - nodeStatusDelLabel = []string{"endpoint", "instance", "job", "namespace", "pod", "service"} + "kubesphere.io/kubesphere/pkg/models/workspaces" + client "kubesphere.io/kubesphere/pkg/simple/client/prometheus" ) +var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary + const ( ChannelMaxCapacityWorkspaceMetric = 800 ChannelMaxCapacity = 100 @@ -145,9 +135,9 @@ func getAllWorkspaces() map[string]int { paramValues := make(url.Values) paramValues.Set("query", WorkspaceNamespaceLabelRule) params := paramValues.Encode() - res := prometheus.SendMonitoringRequest(prometheus.DefaultQueryType, params) + res := client.SendMonitoringRequest(client.DefaultQueryType, params) - metric := ReformatJson(res, "") + metric := ReformatJson(res, "", map[string]string{"workspace": "workspace"}) return getAllWorkspaceNames(metric) } @@ -240,17 +230,17 @@ func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) { } } -func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string, bool) { +func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string, bool) { nsName := monitoringRequest.NsName - wkName := monitoringRequest.WorkloadName - podsFilter := monitoringRequest.PodsFilter + wlName := monitoringRequest.WorkloadName + podsFilter := monitoringRequest.ResourcesFilter - rule := MakeSpecificWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName) + rule := MakeSpecificWorkloadRule(monitoringRequest.WorkloadKind, wlName, nsName) paramValues := monitoringRequest.Params params := makeRequestParamString(rule, paramValues) - res := prometheus.SendMonitoringRequest(prometheus.DefaultQueryType, params) + res := client.SendMonitoringRequest(client.DefaultQueryType, params) podNamesFilter := getPodNameRegexInWorkload(res, podsFilter) @@ -261,32 +251,26 @@ func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *prometheus.Mon return queryType, params, rule == "" } -func AssembleAllWorkloadMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string) { +func AssembleAllWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params - rule := MakeWorkloadPromQL(metricName, monitoringRequest.NsName, monitoringRequest.WlFilter) + rule := MakeWorkloadPromQL(metricName, monitoringRequest.NsName, monitoringRequest.ResourcesFilter) params := makeRequestParamString(rule, paramValues) return queryType, params } -func AssemblePodMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string, bool) { +func AssemblePodMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string, bool) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params - rule := MakePodPromQL(metricName, monitoringRequest.NsName, monitoringRequest.NodeId, monitoringRequest.PodName, monitoringRequest.PodsFilter) + rule := MakePodPromQL(metricName, monitoringRequest.NsName, monitoringRequest.NodeId, monitoringRequest.PodName, monitoringRequest.ResourcesFilter) params := makeRequestParamString(rule, paramValues) return queryType, params, rule == "" } -func GetMetric(queryType, params, metricName string) *FormatedMetric { - res := prometheus.SendMonitoringRequest(queryType, params) - formatedMetric := ReformatJson(res, metricName) - return formatedMetric -} - func GetNodeAddressInfo() *map[string][]v1.NodeAddress { nodeLister := informers.SharedInformerFactory().Core().V1().Nodes().Lister() nodes, err := nodeLister.List(labels.Everything()) @@ -319,33 +303,34 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][] } } -func MonitorContainer(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) *FormatedMetric { +func MonitorContainer(monitoringRequest *client.MonitoringRequestParams, metricName string) *FormatedMetric { queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) - res := GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + res := ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""}) return res } -func AssembleContainerMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string) { +func AssembleContainerMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params - rule := MakeContainerPromQL(monitoringRequest.NsName, monitoringRequest.NodeId, monitoringRequest.PodName, monitoringRequest.ContainerName, metricName, monitoringRequest.ContainersFilter) + rule := MakeContainerPromQL(monitoringRequest.NsName, monitoringRequest.NodeId, monitoringRequest.PodName, monitoringRequest.ContainerName, metricName, monitoringRequest.ResourcesFilter) params := makeRequestParamString(rule, paramValues) return queryType, params } -func AssembleNamespaceMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string) { +func AssembleNamespaceMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params - rule := MakeNamespacePromQL(monitoringRequest.NsName, monitoringRequest.NsFilter, metricName) + rule := MakeNamespacePromQL(monitoringRequest.NsName, monitoringRequest.ResourcesFilter, metricName) params := makeRequestParamString(rule, paramValues) return queryType, params } -func AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, namespaceList []string, metricName string) (string, string) { +func AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, namespaceList []string, metricName string) (string, string) { nsFilter := "^(" + strings.Join(namespaceList, "|") + ")$" @@ -357,7 +342,7 @@ func AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest *prometheus.Mo return queryType, params } -func AssembleAllWorkspaceMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, namespaceList []string, metricName string) (string, string) { +func AssembleAllWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, namespaceList []string, metricName string) (string, string) { var nsFilter = "^()$" if namespaceList != nil { @@ -407,7 +392,7 @@ func filterNamespace(nsFilter string, namespaceList []string) []string { return newNSlist } -func MonitorAllWorkspaces(monitoringRequest *prometheus.MonitoringRequestParams) *FormatedLevelMetric { +func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { metricsFilter := monitoringRequest.MetricsFilter if strings.Trim(metricsFilter, " ") == "" { metricsFilter = ".*" @@ -429,7 +414,7 @@ func MonitorAllWorkspaces(monitoringRequest *prometheus.MonitoringRequestParams) wsMap := getAllWorkspaces() for ws := range wsMap { - bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws) + bol, err := regexp.MatchString(monitoringRequest.ResourcesFilter, ws) if err == nil && bol { // a workspace wgAll.Add(1) @@ -470,7 +455,7 @@ func MonitorAllWorkspaces(monitoringRequest *prometheus.MonitoringRequestParams) } } -func collectWorkspaceMetric(monitoringRequest *prometheus.MonitoringRequestParams, ws string, filterMetricsName []string, wgAll *sync.WaitGroup, wsAllch chan *[]FormatedMetric) { +func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, ws string, filterMetricsName []string, wgAll *sync.WaitGroup, wsAllch chan *[]FormatedMetric) { defer wgAll.Done() var wg sync.WaitGroup var ch = make(chan *FormatedMetric, ChannelMaxCapacity) @@ -484,7 +469,8 @@ func collectWorkspaceMetric(monitoringRequest *prometheus.MonitoringRequestParam go func(metricName string) { queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""}) wg.Done() }(metricName) @@ -511,7 +497,7 @@ func collectWorkspaceMetric(monitoringRequest *prometheus.MonitoringRequestParam wsAllch <- &metricsArray } -func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, resourceType string) *FormatedLevelMetric { +func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resourceType string) *FormatedLevelMetric { metricsFilter := monitoringRequest.MetricsFilter if metricsFilter == "" { metricsFilter = ".*" @@ -529,10 +515,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re wg.Add(1) go func(metricName string) { queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName) - clusterMetrics := GetMetric(queryType, params, metricName) - - ch <- clusterMetrics - + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"}) wg.Done() }(metricName) } @@ -546,7 +530,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re wg.Add(1) go func(metricName string) { queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) wg.Done() }(metricName) } @@ -560,7 +545,7 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re if err != nil { glog.Errorln(err.Error()) } - namespaceArray = filterNamespace(monitoringRequest.NsFilter, namespaceArray) + namespaceArray = filterNamespace(monitoringRequest.ResourcesFilter, namespaceArray) if monitoringRequest.Tp == "rank" { for _, metricName := range NamespaceMetricsNames { @@ -570,12 +555,13 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re bol, err := regexp.MatchString(metricsFilter, metricName) ns := "^(" + strings.Join(namespaceArray, "|") + ")$" - monitoringRequest.NsFilter = ns + monitoringRequest.ResourcesFilter = ns if err == nil && bol { wg.Add(1) go func(metricName string) { queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspace"}) wg.Done() }(metricName) } @@ -593,7 +579,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re wg.Add(1) go func(metricName string) { queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspace"}) wg.Done() }(metricName) } @@ -610,9 +597,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re go func(metricName string) { queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName) - - ch <- GetMetric(queryType, params, metricName) - + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspaces"}) wg.Done() }(metricName) } @@ -627,7 +613,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re wg.Add(1) go func(metricName string) { queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""}) wg.Done() }(metricName) } @@ -635,15 +622,15 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re } case MetricLevelWorkload: { - if monitoringRequest.Tp == "rank" { + if monitoringRequest.WorkloadName == "" { for _, metricName := range WorkloadMetricsNames { bol, err := regexp.MatchString(metricsFilter, metricName) if err == nil && bol { wg.Add(1) go func(metricName string) { queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName) - fmtMetrics := GetMetric(queryType, params, metricName) - ch <- fmtMetrics + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"workload": ""}) wg.Done() }(metricName) } @@ -657,7 +644,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re metricName = strings.TrimLeft(metricName, "workload_") queryType, params, nullRule := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName) if !nullRule { - fmtMetrics := GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + fmtMetrics := ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) unifyMetricHistoryTimeRange(fmtMetrics) ch <- fmtMetrics } @@ -676,7 +664,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re go func(metricName string) { queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName) if !nullRule { - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) } else { ch <- nil } @@ -693,7 +682,8 @@ func MonitorAllMetrics(monitoringRequest *prometheus.MonitoringRequestParams, re wg.Add(1) go func(metricName string) { queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) - ch <- GetMetric(queryType, params, metricName) + metricsStr := client.SendMonitoringRequest(queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""}) wg.Done() }(metricName) } @@ -868,125 +858,7 @@ func getSpecificMetricItem(timestamp int64, metricName string, resource string, return &nsMetrics } -// k8s component(controller, scheduler, etcd) status -func MonitorComponentStatus(monitoringRequest *prometheus.MonitoringRequestParams) *[]interface{} { - componentList, err := k8s.Client().CoreV1().ComponentStatuses().List(metaV1.ListOptions{}) - if err != nil { - glog.Errorln(err.Error()) - } - - var componentStatusList []*ComponentStatus - for _, item := range componentList.Items { - var status []OneComponentStatus - for _, cond := range item.Conditions { - status = append(status, OneComponentStatus{ - Type: string(cond.Type), - Status: string(cond.Status), - Message: cond.Message, - Error: cond.Error, - }) - } - - componentStatusList = append(componentStatusList, &ComponentStatus{ - Name: item.Name, - Namespace: item.Namespace, - Labels: item.Labels, - ComponentStatus: status, - }) - } - - // node status - queryType := monitoringRequest.QueryType - paramValues := monitoringRequest.Params - paramValues.Set("query", NodeStatusRule) - params := paramValues.Encode() - res := prometheus.SendMonitoringRequest(queryType, params) - - nodeStatusMetric := ReformatJson(res, "node_status", nodeStatusDelLabel...) - nodeStatusMetric = ReformatNodeStatusField(nodeStatusMetric) - - var normalNodes []string - var abnormalNodes []string - for _, result := range nodeStatusMetric.Data.Result { - tmap, sure := result[ResultItemMetric].(map[string]interface{}) - - if sure { - if tmap[MetricStatus].(string) == "false" { - abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string)) - } else { - normalNodes = append(normalNodes, tmap[MetricLevelNode].(string)) - } - } - } - - Components, err := components.GetAllComponentsStatus() - - if err != nil { - glog.Error(err.Error()) - } - - var namespaceComponentHealthyMap = make(map[string]int) - var namespaceComponentTotalMap = make(map[string]int) - - for _, ns := range constants.SystemNamespaces { - nsStatus, exist := Components[ns] - if exist { - for _, nsStatusItem := range nsStatus.(map[string]interface{}) { - component := nsStatusItem.(models.Component) - namespaceComponentTotalMap[ns] += 1 - if component.HealthyBackends != 0 && component.HealthyBackends == component.TotalBackends { - namespaceComponentHealthyMap[ns] += 1 - } - } - } - } - - timestamp := int64(time.Now().Unix()) - - onlineMetricItems := makeMetricItems(timestamp, namespaceComponentHealthyMap, MetricLevelNamespace) - metricItems := makeMetricItems(timestamp, namespaceComponentTotalMap, MetricLevelNamespace) - - var assembleList []interface{} - assembleList = append(assembleList, nodeStatusMetric) - - for _, statusItem := range componentStatusList { - assembleList = append(assembleList, statusItem) - } - - assembleList = append(assembleList, FormatedMetric{ - Data: FormatedMetricData{ - Result: *onlineMetricItems, - ResultType: ResultTypeVector, - }, - MetricName: MetricNameComponentOnLine, - Status: MetricStatusSuccess, - }) - - assembleList = append(assembleList, FormatedMetric{ - Data: FormatedMetricData{ - Result: *metricItems, - ResultType: ResultTypeVector, - }, - MetricName: MetricNameComponentLine, - Status: MetricStatusSuccess, - }) - - return &assembleList -} - -func makeMetricItems(timestamp int64, statusMap map[string]int, resourceType string) *[]map[string]interface{} { - var metricItems []map[string]interface{} - - for ns, count := range statusMap { - metricItems = append(metricItems, map[string]interface{}{ - ResultItemMetric: map[string]string{resourceType: ns}, - ResultItemValue: []interface{}{timestamp, fmt.Sprintf("%d", count)}, - }) - } - return &metricItems -} - -func AssembleClusterMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string) { +func AssembleClusterMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params rule := MakeClusterRule(metricName) @@ -995,10 +867,10 @@ func AssembleClusterMetricRequestInfo(monitoringRequest *prometheus.MonitoringRe return queryType, params } -func AssembleNodeMetricRequestInfo(monitoringRequest *prometheus.MonitoringRequestParams, metricName string) (string, string) { +func AssembleNodeMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { queryType := monitoringRequest.QueryType paramValues := monitoringRequest.Params - rule := MakeNodeRule(monitoringRequest.NodeId, monitoringRequest.NodesFilter, metricName) + rule := MakeNodeRule(monitoringRequest.NodeId, monitoringRequest.ResourcesFilter, metricName) params := makeRequestParamString(rule, paramValues) return queryType, params diff --git a/pkg/models/metrics/metricsruleconst.go b/pkg/models/metrics/metricsruleconst.go index 62331052fd58302a85db3e35a9d8e4ef965ce720..72f8adc522040a839015468ec89503c64b6ee738 100644 --- a/pkg/models/metrics/metricsruleconst.go +++ b/pkg/models/metrics/metricsruleconst.go @@ -318,14 +318,14 @@ var RulePromQLTmplMap = MetricMap{ "cluster_disk_read_throughput": "sum(node:data_volume_throughput_bytes_read:sum)", "cluster_disk_write_throughput": "sum(node:data_volume_throughput_bytes_written:sum)", - "cluster_disk_size_usage": `sum(max((node_filesystem_size{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, - "cluster_disk_size_utilisation": `1 - sum(max(node_filesystem_avail{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node)) / sum(max(node_filesystem_size{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, - "cluster_disk_size_capacity": `sum(max(node_filesystem_size{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, - "cluster_disk_size_available": `sum(max(node_filesystem_avail{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, + "cluster_disk_size_usage": `sum(max((node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, + "cluster_disk_size_utilisation": `1 - sum(max(node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node)) / sum(max(node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, + "cluster_disk_size_capacity": `sum(max(node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, + "cluster_disk_size_available": `sum(max(node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:) by (node))`, - "cluster_disk_inode_total": `sum(node:disk_inodes_total:)`, - "cluster_disk_inode_usage": `sum(node:disk_inodes_total:) - sum(node:disk_inodes_free:)`, - "cluster_disk_inode_utilisation": `1 - sum(node:disk_inodes_free:) / sum(node:disk_inodes_total:)`, + "cluster_disk_inode_total": `sum(node:node_inodes_total:)`, + "cluster_disk_inode_usage": `sum(node:node_inodes_total:) - sum(node:node_inodes_free:)`, + "cluster_disk_inode_utilisation": `1 - sum(node:node_inodes_free:) / sum(node:node_inodes_total:)`, "cluster_namespace_count": `count(kube_namespace_annotations)`, @@ -396,14 +396,14 @@ var RulePromQLTmplMap = MetricMap{ "node_disk_read_throughput": "node:data_volume_throughput_bytes_read:sum", "node_disk_write_throughput": "node:data_volume_throughput_bytes_written:sum", - "node_disk_size_capacity": `max(node_filesystem_size{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, - "node_disk_size_available": `max(node_filesystem_avail{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, - "node_disk_size_usage": `max((node_filesystem_size{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, - "node_disk_size_utilisation": `max(((node_filesystem_size{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail{device=~"/dev/.+", job="node-exporter"}) / node_filesystem_size{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, + "node_disk_size_capacity": `max(node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, + "node_disk_size_available": `max(node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, + "node_disk_size_usage": `max((node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, + "node_disk_size_utilisation": `max(((node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"} - node_filesystem_avail_bytes{device=~"/dev/.+", job="node-exporter"}) / node_filesystem_size_bytes{device=~"/dev/.+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:$1) by (node)`, - "node_disk_inode_total": `node:disk_inodes_total:$1`, - "node_disk_inode_usage": `node:disk_inodes_total:$1 - node:disk_inodes_free:$1`, - "node_disk_inode_utilisation": `(1 - (node:disk_inodes_free:$1 / node:disk_inodes_total:$1))`, + "node_disk_inode_total": `node:node_inodes_total:$1`, + "node_disk_inode_usage": `node:node_inodes_total:$1 - node:node_inodes_free:$1`, + "node_disk_inode_utilisation": `(1 - (node:node_inodes_free:$1 / node:node_inodes_total:$1))`, "node_pod_count": `sum by (node) ((kube_pod_status_scheduled{condition="true"} > 0) * on (namespace, pod) group_left(node) kube_pod_info$1 unless on (node) (kube_node_status_condition{condition="Ready",status=~"unknown|false"} > 0))`, "node_pod_quota": `sum(kube_node_status_capacity_pods$1) by (node) unless on (node) (kube_node_status_condition{condition="Ready",status=~"unknown|false"} > 0)`, diff --git a/pkg/models/metrics/namespaces.go b/pkg/models/metrics/namespaces.go index cb707a6064a4758c793af45b67733abb6aa342fe..06abbddb10046e3bd7711e31efd6c675ff910cf5 100644 --- a/pkg/models/metrics/namespaces.go +++ b/pkg/models/metrics/namespaces.go @@ -23,7 +23,7 @@ import ( "k8s.io/api/core/v1" - "kubesphere.io/kubesphere/pkg/simple/client/prometheus" + prom "kubesphere.io/kubesphere/pkg/simple/client/prometheus" ) func GetNamespacesWithMetrics(namespaces []*v1.Namespace) []*v1.Namespace { @@ -34,11 +34,11 @@ func GetNamespacesWithMetrics(namespaces []*v1.Namespace) []*v1.Namespace { nsFilter := "^(" + strings.Join(nsNameList, "|") + ")$" var timeRelateParams = make(url.Values) - params := prometheus.MonitoringRequestParams{ - NsFilter: nsFilter, - Params: timeRelateParams, - QueryType: prometheus.DefaultQueryType, - MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count", + params := prom.MonitoringRequestParams{ + ResourcesFilter: nsFilter, + Params: timeRelateParams, + QueryType: prom.DefaultQueryType, + MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count", } rawMetrics := MonitorAllMetrics(¶ms, MetricLevelNamespace) diff --git a/pkg/models/metrics/util.go b/pkg/models/metrics/util.go index f5eb23b09fb4d262766b4b697790721416144f17..cdf440cd78d74ae0ef9c7ae98bea87dc1faab04a 100644 --- a/pkg/models/metrics/util.go +++ b/pkg/models/metrics/util.go @@ -50,7 +50,7 @@ func (wrapper FormatedMetricDataWrapper) Swap(i, j int) { } // sorted metric by ascending or descending order -func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelMetric, resourceType string) (*FormatedLevelMetric, int) { +func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelMetric) (*FormatedLevelMetric, int) { defer func() { if err := recover(); err != nil { glog.Errorln(err) @@ -83,8 +83,8 @@ func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelM v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64) v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) if v1 == v2 { - resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[resourceType] - resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[resourceType] + resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] + resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] return resourceName1.(string) < resourceName2.(string) } @@ -99,8 +99,8 @@ func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelM v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) if v1 == v2 { - resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[resourceType] - resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[resourceType] + resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] + resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] return resourceName1.(string) > resourceName2.(string) } @@ -111,7 +111,7 @@ func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelM for _, r := range metricItem.Data.Result { // for some reasons, 'metric' may not contain `resourceType` field // example: {"metric":{},"value":[1541142931.731,"3"]} - k, exist := r[ResultItemMetric].(map[string]interface{})[resourceType] + k, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] key := k.(string) if exist { if _, exist := indexMap[key]; !exist { @@ -125,7 +125,7 @@ func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelM // iterator all metric to find max metricItems length for _, r := range metricItem.Data.Result { - k, ok := r[ResultItemMetric].(map[string]interface{})[resourceType] + k, ok := r[ResultItemMetric].(map[string]interface{})["resource_name"] if ok { currentResourceMap[k.(string)] = 1 } @@ -154,7 +154,7 @@ func Sort(sortMetricName string, sortType string, fmtLevelMetric *FormatedLevelM sortedMetric := make([]map[string]interface{}, len(indexMap)) for j := 0; j < len(re.Data.Result); j++ { r := re.Data.Result[j] - k, exist := r[ResultItemMetric].(map[string]interface{})[resourceType] + k, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] if exist { index, exist := indexMap[k.(string)] if exist { @@ -176,7 +176,7 @@ func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric, } // matrix type can not be sorted for _, metricItem := range fmtLevelMetric.Results { - // if metric reterieved field, resultType is "" + // if metric reterieved field, resultType: "" if metricItem.Data.ResultType == ResultTypeMatrix { return fmtLevelMetric } @@ -251,7 +251,7 @@ func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric, } // maybe this function is time consuming -func ReformatJson(metric string, metricsName string, needDelParams ...string) *FormatedMetric { +func ReformatJson(metric string, metricsName string, needAddParams map[string]string, needDelParams ...string) *FormatedMetric { var formatMetric FormatedMetric err := jsonIter.Unmarshal([]byte(metric), &formatMetric) @@ -277,6 +277,17 @@ func ReformatJson(metric string, metricsName string, needDelParams ...string) *F delete(metricMap, p) } } + + if needAddParams != nil && len(needAddParams) > 0 { + for n := range needAddParams { + if v, ok := metricMap[n]; ok { + delete(metricMap, n) + metricMap["resource_name"] = v + } else { + metricMap["resource_name"] = needAddParams[n] + } + } + } } } diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go new file mode 100644 index 0000000000000000000000000000000000000000..b82f1db98edf6ff84eb9bcbc805073b6586fd5fa --- /dev/null +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -0,0 +1,638 @@ +/* +Copyright 2018 The KubeSphere Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package esclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/json-iterator/go" +) + +var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary + +var ( + mu sync.RWMutex + esConfigs *ESConfigs +) + +type ESConfigs struct { + Host string + Port string + Index string +} + +func readESConfigs() *ESConfigs { + mu.RLock() + defer mu.RUnlock() + + return esConfigs +} + +func (configs *ESConfigs) WriteESConfigs() { + mu.Lock() + defer mu.Unlock() + + esConfigs = configs +} + +type Request struct { + From int64 `json:"from"` + Size int64 `json:"size"` + Sorts []Sort `json:"sort,omitempty"` + MainQuery MainQuery `json:"query"` + Aggs interface{} `json:"aggs,omitempty"` + MainHighLight MainHighLight `json:"highlight,omitempty"` +} + +type Sort struct { + Order Order `json:"time"` +} + +type Order struct { + Order string `json:"order"` +} + +type MainQuery struct { + MainBoolQuery MainBoolQuery `json:"bool"` +} + +type MainBoolQuery struct { + MainFilter MainFilter `json:"filter"` +} + +type MainFilter struct { + FilterBoolQuery FilterBoolQuery `json:"bool"` +} + +type FilterBoolQuery struct { + Musts []interface{} `json:"must"` +} + +type RangeQuery struct { + RangeSpec RangeSpec `json:"range"` +} + +type RangeSpec struct { + TimeRange TimeRange `json:"time"` +} + +type TimeRange struct { + Gte string `json:"gte,omitempty"` + Lte string `json:"lte,omitempty"` +} + +type BoolShouldMatchPhrase struct { + ShouldMatchPhrase ShouldMatchPhrase `json:"bool"` +} + +type ShouldMatchPhrase struct { + Shoulds []interface{} `json:"should"` + MinimumShouldMatch int64 `json:"minimum_should_match"` +} + +type MatchPhrase struct { + MatchPhrase interface{} `json:"match_phrase"` +} + +type Match struct { + Match interface{} `json:"match"` +} + +type QueryWord struct { + Word string `json:"query"` +} + +type MainHighLight struct { + Fields []interface{} `json:"fields,omitempty"` +} + +type LogHighLightField struct { + FieldContent EmptyField `json:"log"` +} + +type NamespaceHighLightField struct { + FieldContent EmptyField `json:"kubernetes.namespace_name.keyword"` +} + +type PodHighLightField struct { + FieldContent EmptyField `json:"kubernetes.pod_name.keyword"` +} + +type ContainerHighLightField struct { + FieldContent EmptyField `json:"kubernetes.container_name.keyword"` +} + +type EmptyField struct { +} + +type StatisticsAggs struct { + NamespaceAgg NamespaceAgg `json:"Namespace"` +} + +type NamespaceAgg struct { + Terms StatisticsAggTerm `json:"terms"` + ContainerAggs ContainerAggs `json:"aggs"` +} + +type ContainerAggs struct { + ContainerAgg ContainerAgg `json:"Container"` +} + +type ContainerAgg struct { + Terms StatisticsAggTerm `json:"terms"` +} + +type StatisticsAggTerm struct { + Field string `json:"field"` + Size int64 `json:"size"` +} + +type HistogramAggs struct { + HistogramAgg HistogramAgg `json:"histogram"` +} + +type HistogramAgg struct { + DateHistogram DateHistogram `json:"date_histogram"` +} + +type DateHistogram struct { + Field string `json:"field"` + Interval string `json:"interval"` +} + +func createQueryRequest(param QueryParameters) (int, []byte, error) { + var request Request + var mainBoolQuery MainBoolQuery + + if param.NamespaceFilled { + var shouldMatchPhrase ShouldMatchPhrase + if len(param.Namespaces) == 0 { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.namespace_name.key_word": QueryWord{""}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } else { + for _, namespace := range param.Namespaces { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.namespace_name.keyword": QueryWord{namespace}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } + } + shouldMatchPhrase.MinimumShouldMatch = 1 + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + } + if param.PodFilled { + var shouldMatchPhrase ShouldMatchPhrase + if len(param.Pods) == 0 { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.pod_name.key_word": QueryWord{""}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } else { + for _, pod := range param.Pods { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.pod_name.keyword": QueryWord{pod}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } + } + shouldMatchPhrase.MinimumShouldMatch = 1 + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + } + if param.ContainerFilled { + var shouldMatchPhrase ShouldMatchPhrase + if len(param.Containers) == 0 { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.container_name.key_word": QueryWord{""}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } else { + for _, container := range param.Containers { + matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.container_name.keyword": QueryWord{container}}} + shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) + } + } + shouldMatchPhrase.MinimumShouldMatch = 1 + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + } + + if param.NamespaceQuery != "" { + match := Match{map[string]interface{}{"kubernetes.namespace_name": QueryWord{param.NamespaceQuery}}} + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, match) + } + if param.PodQuery != "" { + match := Match{map[string]interface{}{"kubernetes.pod_name": QueryWord{param.PodQuery}}} + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, match) + } + if param.ContainerQuery != "" { + match := Match{map[string]interface{}{"kubernetes.container_name": QueryWord{param.ContainerQuery}}} + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, match) + } + + if param.LogQuery != "" { + match := Match{map[string]interface{}{"log": QueryWord{param.LogQuery}}} + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, match) + } + + rangeQuery := RangeQuery{RangeSpec{TimeRange{param.StartTime, param.EndTime}}} + mainBoolQuery.MainFilter.FilterBoolQuery.Musts = append(mainBoolQuery.MainFilter.FilterBoolQuery.Musts, rangeQuery) + + var operation int + + if param.Operation == "statistics" { + operation = OperationStatistics + containerAggs := ContainerAggs{ContainerAgg{StatisticsAggTerm{"kubernetes.container_name.keyword", 2147483647}}} + namespaceAgg := NamespaceAgg{StatisticsAggTerm{"kubernetes.namespace_name.keyword", 2147483647}, containerAggs} + request.Aggs = StatisticsAggs{namespaceAgg} + request.Size = 0 + } else if param.Operation == "histogram" { + operation = OperationHistogram + var interval string + if param.Interval != "" { + interval = param.Interval + } else { + interval = "15m" + } + param.Interval = interval + request.Aggs = HistogramAggs{HistogramAgg{DateHistogram{"time", interval}}} + request.Size = 0 + } else { + operation = OperationQuery + request.From = param.From + request.Size = param.Size + var order string + if strings.Compare(strings.ToLower(param.Sort), "asc") == 0 { + order = "asc" + } else { + order = "desc" + } + request.Sorts = append(request.Sorts, Sort{Order{order}}) + + var mainHighLight MainHighLight + mainHighLight.Fields = append(mainHighLight.Fields, LogHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, NamespaceHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, PodHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, ContainerHighLightField{}) + request.MainHighLight = mainHighLight + } + + request.MainQuery = MainQuery{mainBoolQuery} + + queryRequest, err := json.Marshal(request) + + return operation, queryRequest, err +} + +type Response struct { + Status int `json:"status"` + Workspace string `json:"workspace,omitempty"` + Shards Shards `json:"_shards"` + Hits Hits `json:"hits"` + Aggregations json.RawMessage `json:"aggregations"` +} + +type Shards struct { + Total int64 `json:"total"` + Successful int64 `json:"successful"` + Skipped int64 `json:"skipped"` + Failed int64 `json:"failed"` +} + +type Hits struct { + Total int64 `json:"total"` + Hits []Hit `json:"hits"` +} + +type Hit struct { + Source Source `json:"_source"` + HighLight HighLight `json:"highlight"` +} + +type Source struct { + Log string `json:"log"` + Time string `json:"time"` + Kubernetes Kubernetes `json:"kubernetes"` +} + +type Kubernetes struct { + Namespace string `json:"namespace_name"` + Pod string `json:"pod_name"` + Container string `json:"container_name"` + Host string `json:"host"` +} + +type HighLight struct { + LogHighLights []string `json:"log,omitempty"` + NamespaceHighLights []string `json:"kubernetes.namespace_name.keyword,omitempty"` + PodHighLights []string `json:"kubernetes.pod_name.keyword,omitempty"` + ContainerHighLights []string `json:"kubernetes.container_name.keyword,omitempty"` +} + +type LogRecord struct { + Time int64 `json:"time,omitempty"` + Log string `json:"log,omitempty"` + Namespace string `json:"namespace,omitempty"` + Pod string `json:"pod,omitempty"` + Container string `json:"container,omitempty"` + Host string `json:"host,omitempty"` + HighLight HighLight `json:"highlight,omitempty"` +} + +type ReadResult struct { + Total int64 `json:"total"` + From int64 `json:"from"` + Size int64 `json:"size"` + Records []LogRecord `json:"records,omitempty"` +} + +type NamespaceAggregations struct { + NamespaceAggregation NamespaceAggregation `json:"Namespace"` +} + +type NamespaceAggregation struct { + Namespaces []NamespaceStatistics `json:"buckets"` +} + +type NamespaceStatistics struct { + Namespace string `json:"Key"` + Count int64 `json:"doc_count"` + ContainerAggregation ContainerAggregation `json:"Container"` +} + +type ContainerAggregation struct { + Containers []ContainerStatistics `json:"buckets"` +} + +type ContainerStatistics struct { + Container string `json:"Key"` + Count int64 `json:"doc_count"` +} + +type NamespaceResult struct { + Namespace string `json:"namespace"` + Count int64 `json:"count"` + Containers []ContainerResult `json:"containers"` +} + +type ContainerResult struct { + Container string `json:"container"` + Count int64 `json:"count"` +} + +type StatisticsResult struct { + Total int64 `json:"total"` + Namespaces []NamespaceResult `json:"namespaces"` +} + +type HistogramAggregations struct { + HistogramAggregation HistogramAggregation `json:"histogram"` +} + +type HistogramAggregation struct { + Histograms []HistogramStatistics `json:"buckets"` +} + +type HistogramStatistics struct { + Time int64 `json:"key"` + Count int64 `json:"doc_count"` +} + +type HistogramRecord struct { + Time int64 `json:"time"` + Count int64 `json:"count"` +} + +type HistogramResult struct { + Total int64 `json:"total"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + Interval string `json:"interval"` + Histograms []HistogramRecord `json:"histograms"` +} + +type QueryResult struct { + Status int `json:"status,omitempty"` + Workspace string `json:"workspace,omitempty"` + Read *ReadResult `json:"query,omitempty"` + Statistics *StatisticsResult `json:"statistics,omitempty"` + Histogram *HistogramResult `json:"histogram,omitempty"` + Request string `json:"request,omitempty"` + Response string `json:"response,omitempty"` +} + +const ( + OperationQuery int = iota + OperationStatistics + OperationHistogram +) + +func calcTimestamp(input string) int64 { + var t time.Time + var err error + var ret int64 + + ret = 0 + + t, err = time.Parse(time.RFC3339, input) + if err != nil { + var i int64 + i, err = strconv.ParseInt(input, 10, 64) + if err == nil { + ret = time.Unix(i/1000, (i%1000)*1000000).UnixNano() / 1000000 + } + } else { + ret = t.UnixNano() / 1000000 + } + + return ret +} + +func parseQueryResult(operation int, param QueryParameters, body []byte, query []byte) *QueryResult { + var queryResult QueryResult + //queryResult.Request = string(query) + //queryResult.Response = string(body) + + var response Response + err := jsonIter.Unmarshal(body, &response) + if err != nil { + //fmt.Println("Parse response error ", err.Error()) + queryResult.Status = http.StatusNotFound + return &queryResult + } + + if response.Status != 0 { + //Elastic error, eg, es_rejected_execute_exception + queryResult.Status = response.Status + return &queryResult + } + + if response.Shards.Successful != response.Shards.Total { + //Elastic some shards error + queryResult.Status = http.StatusInternalServerError + return &queryResult + } + + switch operation { + case OperationQuery: + var readResult ReadResult + readResult.Total = response.Hits.Total + readResult.From = param.From + readResult.Size = param.Size + for _, hit := range response.Hits.Hits { + var logRecord LogRecord + logRecord.Time = calcTimestamp(hit.Source.Time) + logRecord.Log = hit.Source.Log + logRecord.Namespace = hit.Source.Kubernetes.Namespace + logRecord.Pod = hit.Source.Kubernetes.Pod + logRecord.Container = hit.Source.Kubernetes.Container + logRecord.Host = hit.Source.Kubernetes.Host + logRecord.HighLight = hit.HighLight + readResult.Records = append(readResult.Records, logRecord) + } + queryResult.Read = &readResult + + case OperationStatistics: + var statisticsResult StatisticsResult + statisticsResult.Total = response.Hits.Total + + var namespaceAggregations NamespaceAggregations + jsonIter.Unmarshal(response.Aggregations, &namespaceAggregations) + + for _, namespace := range namespaceAggregations.NamespaceAggregation.Namespaces { + var namespaceResult NamespaceResult + namespaceResult.Namespace = namespace.Namespace + namespaceResult.Count = namespace.Count + + for _, container := range namespace.ContainerAggregation.Containers { + var containerResult ContainerResult + containerResult.Container = container.Container + containerResult.Count = container.Count + namespaceResult.Containers = append(namespaceResult.Containers, containerResult) + } + + statisticsResult.Namespaces = append(statisticsResult.Namespaces, namespaceResult) + } + + queryResult.Statistics = &statisticsResult + + case OperationHistogram: + var histogramResult HistogramResult + histogramResult.Total = response.Hits.Total + histogramResult.StartTime = calcTimestamp(param.StartTime) + histogramResult.EndTime = calcTimestamp(param.EndTime) + histogramResult.Interval = param.Interval + + var histogramAggregations HistogramAggregations + jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) + for _, histogram := range histogramAggregations.HistogramAggregation.Histograms { + var histogramRecord HistogramRecord + histogramRecord.Time = histogram.Time + histogramRecord.Count = histogram.Count + + histogramResult.Histograms = append(histogramResult.Histograms, histogramRecord) + } + + queryResult.Histogram = &histogramResult + } + + queryResult.Status = http.StatusOK + queryResult.Workspace = param.Workspace + + return &queryResult +} + +type QueryParameters struct { + NamespaceFilled bool + Namespaces []string + PodFilled bool + Pods []string + ContainerFilled bool + Containers []string + + NamespaceQuery string + PodQuery string + ContainerQuery string + + Workspace string + + Operation string + LogQuery string + Interval string + StartTime string + EndTime string + Sort string + From int64 + Size int64 +} + +func stubResult() *QueryResult { + var queryResult QueryResult + + queryResult.Status = http.StatusOK + + return &queryResult +} + +func Query(param QueryParameters) *QueryResult { + var queryResult *QueryResult + + //queryResult = stubResult() + //return queryResult + + client := &http.Client{} + + operation, query, err := createQueryRequest(param) + if err != nil { + //fmt.Println("Create query error ", err.Error()) + queryResult = new(QueryResult) + queryResult.Status = http.StatusNotFound + return queryResult + } + + es := readESConfigs() + if es == nil { + queryResult = new(QueryResult) + queryResult.Status = http.StatusNotFound + return queryResult + } + + url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index) + request, err := http.NewRequest("GET", url, bytes.NewBuffer(query)) + if err != nil { + //fmt.Println("Create request error ", err.Error()) + queryResult = new(QueryResult) + queryResult.Status = http.StatusNotFound + return queryResult + } + request.Header.Set("Content-Type", "application/json; charset=utf-8") + + response, err := client.Do(request) + if err != nil { + //fmt.Println("Send request error ", err.Error()) + queryResult = new(QueryResult) + queryResult.Status = http.StatusNotFound + return queryResult + } + defer response.Body.Close() + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + //fmt.Println("Read response error ", err.Error()) + queryResult = new(QueryResult) + queryResult.Status = http.StatusNotFound + return queryResult + } + + queryResult = parseQueryResult(operation, param, body, query) + + return queryResult +} diff --git a/pkg/simple/client/fluentbit/fluentbitcrdclient.go b/pkg/simple/client/fluentbit/fluentbitcrdclient.go new file mode 100644 index 0000000000000000000000000000000000000000..4f3df73c3bca3f7a9ab5ac558f8530038c07babe --- /dev/null +++ b/pkg/simple/client/fluentbit/fluentbitcrdclient.go @@ -0,0 +1,289 @@ +/* +Copyright 2018 The KubeSphere Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package fluentbitclient + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "time" +) + +const ( + CRDPlural string = "fluentbits" + CRDGroup string = "logging.kubesphere.io" + CRDVersion string = "v1alpha1" + FullCRDName string = CRDPlural + "." + CRDGroup +) + +// FluentBitList auto generated by the sdk +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type FluentBitList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []FluentBit `json:"items"` +} + +// FluentBit auto generated by the sdk +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type FluentBit struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec FluentBitSpec `json:"spec"` + Status FluentBitStatus `json:"status,omitempty"` +} + +// FluentBitSpec holds the spec for the operator +type FluentBitSpec struct { + Service []Plugin `json:"service"` + Input []Plugin `json:"input"` + Filter []Plugin `json:"filter"` + Output []Plugin `json:"output"` + Settings []Plugin `json:"settings"` +} + +// FluentBitStatus holds the status info for the operator +type FluentBitStatus struct { + // Fill me +} + +// Plugin struct for fluent-bit plugins +type Plugin struct { + Type string `json:"type"` + Name string `json:"name"` + Parameters []Parameter `json:"parameters"` +} + +// Fluent-bit output plugins +type OutputPlugin struct { + Plugin + Id uint `json:"id"` + Enable bool `json:"enable"` + Updatetime time.Time `json:"updatetime"` +} + +// Parameter generic parameter type to handle values from different sources +type Parameter struct { + Name string `json:"name"` + ValueFrom *ValueFrom `json:"valueFrom,omitempty"` + Value string `json:"value"` +} + +// ValueFrom generic type to determine value origin +type ValueFrom struct { + SecretKeyRef KubernetesSecret `json:"secretKeyRef"` +} + +// KubernetesSecret is a ValueFrom type +type KubernetesSecret struct { + Name string `json:"name"` + Key string `json:"key"` + Namespace string `json:"namespace"` +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FluentBit) DeepCopyInto(out *FluentBit) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FluentBit. +func (in *FluentBit) DeepCopy() *FluentBit { + if in == nil { + return nil + } + out := new(FluentBit) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FluentBit) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FluentBitList) DeepCopyInto(out *FluentBitList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]FluentBit, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FluentBitList. +func (in *FluentBitList) DeepCopy() *FluentBitList { + if in == nil { + return nil + } + out := new(FluentBitList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FluentBitList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FluentBitSpec) DeepCopyInto(out *FluentBitSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FluentBitSpec. +func (in *FluentBitSpec) DeepCopy() *FluentBitSpec { + if in == nil { + return nil + } + out := new(FluentBitSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FluentBitStatus) DeepCopyInto(out *FluentBitStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FluentBitStatus. +func (in *FluentBitStatus) DeepCopy() *FluentBitStatus { + if in == nil { + return nil + } + out := new(FluentBitStatus) + in.DeepCopyInto(out) + return out +} + +// Create a Rest client with the new CRD Schema +var SchemeGroupVersion = schema.GroupVersion{Group: CRDGroup, Version: CRDVersion} + +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &FluentBit{}, + &FluentBitList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} + +func NewFluentbitCRDClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) { + scheme := runtime.NewScheme() + SchemeBuilder := runtime.NewSchemeBuilder(addKnownTypes) + if err := SchemeBuilder.AddToScheme(scheme); err != nil { + return nil, nil, err + } + config := *cfg + config.GroupVersion = &SchemeGroupVersion + config.APIPath = "/apis" + config.ContentType = runtime.ContentTypeJSON + config.NegotiatedSerializer = serializer.DirectCodecFactory{ + CodecFactory: serializer.NewCodecFactory(scheme)} + + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, nil, err + } + return client, scheme, nil +} + +// This file implement all the (CRUD) client methods we need to access our CRD object + +func CrdClient(cl *rest.RESTClient, scheme *runtime.Scheme, namespace string) *crdclient { + return &crdclient{cl: cl, ns: namespace, plural: CRDPlural, + codec: runtime.NewParameterCodec(scheme)} +} + +type crdclient struct { + cl *rest.RESTClient + ns string + plural string + codec runtime.ParameterCodec +} + +func (f *crdclient) Create(obj *FluentBit) (*FluentBit, error) { + var result FluentBit + err := f.cl.Post(). + Namespace(f.ns).Resource(f.plural). + Body(obj).Do().Into(&result) + return &result, err +} + +func (f *crdclient) Update(name string, obj *FluentBit) (*FluentBit, error) { + var result FluentBit + err := f.cl.Put(). + Namespace(f.ns).Resource(f.plural). + Name(name).Body(obj).Do().Into(&result) + return &result, err +} + +func (f *crdclient) Delete(name string, options *metav1.DeleteOptions) error { + return f.cl.Delete(). + Namespace(f.ns).Resource(f.plural). + Name(name).Body(options).Do(). + Error() +} + +func (f *crdclient) Get(name string) (*FluentBit, error) { + var result FluentBit + err := f.cl.Get(). + Namespace(f.ns).Resource(f.plural). + Name(name).Do().Into(&result) + return &result, err +} + +func (f *crdclient) List(opts metav1.ListOptions) (*FluentBitList, error) { + var result FluentBitList + err := f.cl.Get(). + Namespace(f.ns).Resource(f.plural). + VersionedParams(&opts, f.codec). + Do().Into(&result) + return &result, err +} + +// Create a new List watch for our TPR +func (f *crdclient) NewListWatch() *cache.ListWatch { + return cache.NewListWatchFromClient(f.cl, f.plural, f.ns, fields.Everything()) +} + +// return rest config, if path not specified assume in cluster config +func GetClientConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} diff --git a/pkg/simple/client/prometheus/prometheusclient.go b/pkg/simple/client/prometheus/prometheusclient.go index e4963c4a8614ce74cf58cf311c2644d2155fe033..0c3855a3893f33d497f8857d31983ea81d7e1afd 100644 --- a/pkg/simple/client/prometheus/prometheusclient.go +++ b/pkg/simple/client/prometheus/prometheusclient.go @@ -18,7 +18,6 @@ package prometheus import ( - "flag" "io/ioutil" "net/http" "net/url" @@ -26,54 +25,58 @@ import ( "strings" "time" + "os" + "github.com/emicklei/go-restful" "github.com/golang/glog" ) const ( - DefaultQueryStep = "10m" - DefaultQueryTimeout = "10s" - RangeQueryType = "query_range?" - DefaultQueryType = "query?" + DefaultScheme = "http" + DefaultPrometheusPort = "9090" + PrometheusApiPath = "/api/v1/" + DefaultQueryStep = "10m" + DefaultQueryTimeout = "10s" + RangeQueryType = "query_range?" + DefaultQueryType = "query?" + PrometheusAPIServerEnv = "PROMETHEUS_API_SERVER" ) -var ( - PrometheusAPIEndpoint string -) +var PrometheusAPIServer = "prometheus-k8s.kubesphere-monitoring-system.svc" +var PrometheusAPIEndpoint string func init() { - flag.StringVar(&PrometheusAPIEndpoint, "prometheus-endpoint", "http://prometheus-k8s.kubesphere-monitoring-system.svc:9090/api/v1/", "prometheus api endpoint") + if env := os.Getenv(PrometheusAPIServerEnv); env != "" { + PrometheusAPIServer = env + } + PrometheusAPIEndpoint = DefaultScheme + "://" + PrometheusAPIServer + ":" + DefaultPrometheusPort + PrometheusApiPath } type MonitoringRequestParams struct { - Params url.Values - QueryType string - SortMetricName string - SortType string - PageNum string - LimitNum string - Tp string - MetricsFilter string - NodesFilter string - WsFilter string - NsFilter string - PodsFilter string - ContainersFilter string - MetricsName string - WorkloadName string - WlFilter string - NodeId string - WsName string - NsName string - PodName string - ContainerName string - WorkloadKind string + Params url.Values + QueryType string + SortMetricName string + SortType string + PageNum string + LimitNum string + Tp string + MetricsFilter string + ResourcesFilter string + MetricsName string + WorkloadName string + NodeId string + WsName string + NsName string + PodName string + ContainerName string + WorkloadKind string } +var client = &http.Client{} + func SendMonitoringRequest(queryType string, params string) string { epurl := PrometheusAPIEndpoint + queryType + params - - response, err := http.DefaultClient.Get(epurl) + response, err := client.Get(epurl) if err != nil { glog.Error(err) } else { @@ -103,15 +106,10 @@ func ParseMonitoringRequestParams(request *restful.Request) *MonitoringRequestPa tp := strings.Trim(request.QueryParameter("type"), " ") metricsFilter := strings.Trim(request.QueryParameter("metrics_filter"), " ") - nodesFilter := strings.Trim(request.QueryParameter("nodes_filter"), " ") - wsFilter := strings.Trim(request.QueryParameter("workspaces_filter"), " ") - nsFilter := strings.Trim(request.QueryParameter("namespaces_filter"), " ") - wlFilter := strings.Trim(request.QueryParameter("workloads_filter"), " ") - podsFilter := strings.Trim(request.QueryParameter("pods_filter"), " ") - containersFilter := strings.Trim(request.QueryParameter("containers_filter"), " ") + resourcesFilter := strings.Trim(request.QueryParameter("resources_filter"), " ") metricsName := strings.Trim(request.QueryParameter("metrics_name"), " ") - workloadName := strings.Trim(request.QueryParameter("workload_name"), " ") + workloadName := strings.Trim(request.PathParameter("workload"), " ") nodeId := strings.Trim(request.PathParameter("node"), " ") wsName := strings.Trim(request.PathParameter("workspace"), " ") @@ -121,26 +119,21 @@ func ParseMonitoringRequestParams(request *restful.Request) *MonitoringRequestPa workloadKind := strings.Trim(request.PathParameter("workload_kind"), " ") var requestParams = MonitoringRequestParams{ - SortMetricName: sortMetricName, - SortType: sortType, - PageNum: pageNum, - LimitNum: limitNum, - Tp: tp, - MetricsFilter: metricsFilter, - NodesFilter: nodesFilter, - WsFilter: wsFilter, - NsFilter: nsFilter, - PodsFilter: podsFilter, - ContainersFilter: containersFilter, - MetricsName: metricsName, - WorkloadName: workloadName, - WlFilter: wlFilter, - NodeId: nodeId, - WsName: wsName, - NsName: nsName, - PodName: podName, - ContainerName: containerName, - WorkloadKind: workloadKind, + SortMetricName: sortMetricName, + SortType: sortType, + PageNum: pageNum, + LimitNum: limitNum, + Tp: tp, + MetricsFilter: metricsFilter, + ResourcesFilter: resourcesFilter, + MetricsName: metricsName, + WorkloadName: workloadName, + NodeId: nodeId, + WsName: wsName, + NsName: nsName, + PodName: podName, + ContainerName: containerName, + WorkloadKind: workloadKind, } if timeout == "" {