diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 33d91a216a3393f744f4bee3ce56c0e4dbe78e40..b4d1ba45e235d2f8f3d96ddecf80c8338f8006f9 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -31,9 +31,6 @@ import ( "kubesphere.io/kubesphere/pkg/informers" logging "kubesphere.io/kubesphere/pkg/models/log" "kubesphere.io/kubesphere/pkg/signals" - es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" - fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" - "kubesphere.io/kubesphere/pkg/simple/client/mysql" "log" "net/http" ) @@ -116,30 +113,18 @@ func initializeKialiConfig(s *options.ServerRunOptions) { func initializeESClientConfig() { - var outputs []logging.OutputDBBinding - var configs *es.ESConfigs - - db := mysql.Client() - if !db.HasTable(&logging.OutputDBBinding{}) { - // Panic - log.Print("Flyway migration is not completed") - } - - err := db.Find(&outputs).Error + // List all outputs + outputs,err := logging.GetFluentbitOutputFromConfigMap() if err != nil { - log.Printf("get logging config failed. Error: %v", err) + glog.Errorln(err) return } - // Retrieve es-type output from db - var params []fb.Parameter + // Iterate the outputs to get elasticsearch configs for _, output := range outputs { - err := jsonIter.UnmarshalFromString(output.Parameters, ¶ms) - if err == nil { - if configs = logging.ParseEsOutputParams(params); configs != nil { - configs.WriteESConfigs() - return - } + if configs := logging.ParseEsOutputParams(output.Parameters); configs != nil { + configs.WriteESConfigs() + return } } } diff --git a/pkg/apis/logging/v1alpha2/register.go b/pkg/apis/logging/v1alpha2/register.go index 2247e7f764a4b2f0e944954398444dd7849a5cb2..0e1672b35c96d2bbb105aaaa61b247b2adce6614 100644 --- a/pkg/apis/logging/v1alpha2/register.go +++ b/pkg/apis/logging/v1alpha2/register.go @@ -214,4 +214,4 @@ func addWebService(c *restful.Container) error { c.Add(ws) return nil -} +} \ No newline at end of file diff --git a/pkg/apiserver/logging/logging.go b/pkg/apiserver/logging/logging.go index 68ec7394861d76a313e910be1820b3be8493ffc6..049a1282b1a3bfa5c6b8f53fb1978d3a2a8a3365 100644 --- a/pkg/apiserver/logging/logging.go +++ b/pkg/apiserver/logging/logging.go @@ -91,6 +91,7 @@ func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Re err := request.ReadEntity(&output) if err != nil { + glog.Errorln(err) res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest} } else { res = log.FluentbitOutputInsert(output) @@ -104,14 +105,10 @@ func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Re var output fb.OutputPlugin id := request.PathParameter("output") - _, err := strconv.ParseUint(id, 10, 64) - if err != nil { - res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest} - response.WriteAsJson(res) - return - } - err = request.ReadEntity(&output) + + err := request.ReadEntity(&output) if err != nil { + glog.Errorln(err) res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest} response.WriteAsJson(res) return @@ -126,12 +123,7 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re var res *log.FluentbitOutputsResult id := request.PathParameter("output") - _, err := strconv.ParseUint(id, 10, 64) - if err != nil { - res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest} - } else { - res = log.FluentbitOutputDelete(id) - } + res = log.FluentbitOutputDelete(id) response.WriteAsJson(res) } diff --git a/pkg/db/schema/V0__Logging_output_configs.sql b/pkg/db/schema/V0__Logging_output_configs.sql deleted file mode 100644 index 099f5f730509a8649d5ea870888e4a6e49ac21fe..0000000000000000000000000000000000000000 --- a/pkg/db/schema/V0__Logging_output_configs.sql +++ /dev/null @@ -1,14 +0,0 @@ -CREATE TABLE output_db_bindings - ( - id INT NOT NULL AUTO_INCREMENT, - type TEXT NOT NULL, - name TEXT NOT NULL, - parameters TEXT NOT NULL, - internal BOOLEAN, - enable BOOLEAN NOT NULL, - updatetime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (id), - UNIQUE (id) - ); - -INSERT INTO output_db_bindings (type, name, parameters, enable) VALUES ('fluentbit_output', 'fluentbit-output', '[{"name": "Name", "value": "es"}, {"name": "Match", "value": "kube.*"}, {"name": "Host", "value": "elasticsearch-logging-data.kubesphere-logging-system.svc"}, {"name": "Port", "value": "9200"}, {"name": "Logstash_Format", "value": "On"}, {"name": "Replace_Dots", "value": "on"}, {"name": "Retry_Limit", "value": "False"}, {"name": "Type", "value": "flb_type"}, {"name": "Time_Key", "value": "@timestamp"}, {"name": "Logstash_Prefix", "value": "logstash"} ]', '1'); \ No newline at end of file diff --git a/pkg/models/log/constants.go b/pkg/models/log/constants.go index cfb8efcce10dc72379d87a07d83e1db6549ed06d..a184b6f64615f0551ee310e0e6bfb13aed7f8b34 100644 --- a/pkg/models/log/constants.go +++ b/pkg/models/log/constants.go @@ -27,4 +27,4 @@ const ( QueryLevelWorkload QueryLevelPod QueryLevelContainer -) +) \ No newline at end of file diff --git a/pkg/models/log/logcrd.go b/pkg/models/log/logcrd.go index 923077989f1a73074dd9ea9a5a6ecd61680d8b3d..3abce8e7de328d80e9da2c7b3850f432f1b98001 100644 --- a/pkg/models/log/logcrd.go +++ b/pkg/models/log/logcrd.go @@ -17,22 +17,31 @@ limitations under the License. package log import ( - "github.com/jinzhu/gorm" + _ "github.com/go-sql-driver/mysql" + "github.com/golang/glog" + "github.com/google/uuid" "github.com/json-iterator/go" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "kubesphere.io/kubesphere/pkg/informers" es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" - "kubesphere.io/kubesphere/pkg/simple/client/mysql" "net/http" "strings" "time" - - _ "github.com/go-sql-driver/mysql" ) var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary +const ( + ConfigMapName = "fluent-bit-output-config" + ConfigMapData = "outputs" + LoggingNamespace = "kubesphere-logging-system" +) + func createCRDClientSet() (*rest.RESTClient, *runtime.Scheme, error) { config, err := fb.GetClientConfig("") if err != nil { @@ -84,7 +93,7 @@ func FluentbitFiltersQuery() *FluentbitFiltersResult { } // Create a CRD client interface - crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace) item, err := crdclient.Get("fluent-bit") if err != nil { @@ -193,7 +202,7 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult } // Create a CRD client interface - crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace) var item *fb.FluentBit var err_read error @@ -221,34 +230,13 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult func FluentbitOutputsQuery() *FluentbitOutputsResult { var result FluentbitOutputsResult - // Retrieve outputs from DB - db := mysql.Client() - - var outputs []OutputDBBinding - - err := db.Find(&outputs).Error + outputs, err := GetFluentbitOutputFromConfigMap() if err != nil { - result.Status = http.StatusInternalServerError + result.Status = http.StatusNotFound return &result } - var unmarshaledOutputs []fb.OutputPlugin - - for _, output := range outputs { - var params []fb.Parameter - - err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms) - if err != nil { - result.Status = http.StatusInternalServerError - return &result - } - - unmarshaledOutputs = append(unmarshaledOutputs, - fb.OutputPlugin{Plugin: fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params}, - Id: output.Id, Enable: output.Enable, Updatetime: output.Updatetime}) - } - - result.Outputs = unmarshaledOutputs + result.Outputs = outputs result.Status = http.StatusOK return &result @@ -257,25 +245,29 @@ func FluentbitOutputsQuery() *FluentbitOutputsResult { func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { var result FluentbitOutputsResult - params, err := jsoniter.MarshalToString(output.Parameters) - if err != nil { - result.Status = http.StatusBadRequest - return &result + // 1. Update ConfigMap + var outputs []fb.OutputPlugin + outputs, err := GetFluentbitOutputFromConfigMap() + if err != nil { + // If the ConfigMap doesn't exist, a new one will be created later + glog.Errorln(err) } - // 1. Update DB - db := mysql.Client() + // When adding a new output for the first time, one should always set it disabled + output.Enable = false + output.Id = uuid.New().String() + output.Updatetime = time.Now() - marshaledOutput := OutputDBBinding{Type: output.Type, Name: output.Name, - Parameters: params, Enable: output.Enable, Updatetime: time.Now()} - err = db.Create(&marshaledOutput).Error + outputs = append(outputs, output) + + err = updateFluentbitOutputConfigMap(outputs) if err != nil { result.Status = http.StatusInternalServerError return &result } - // 2. Keep CRD in inline with DB - err = syncFluentbitCRDOutputWithDB(db) + // 2. Keep CRD in inline with ConfigMap + err = syncFluentbitCRDOutputWithConfigMap(outputs) if err != nil { result.Status = http.StatusInternalServerError return &result @@ -294,37 +286,40 @@ func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsResult { var result FluentbitOutputsResult - // 1. Update DB - db := mysql.Client() - - params, err := jsoniter.MarshalToString(output.Parameters) + // 1. Update ConfigMap + var outputs []fb.OutputPlugin + outputs, err := GetFluentbitOutputFromConfigMap() if err != nil { - result.Status = http.StatusBadRequest - return &result + // If the ConfigMap doesn't exist, a new one will be created later + glog.Errorln(err) } - var marshaledOutput OutputDBBinding - err = db.Where("id = ?", id).First(&marshaledOutput).Error - if err != nil { - result.Status = http.StatusInternalServerError + index := 0 + for _, output := range outputs { + if output.Id == id { + break + } + index++ + } + + if index >= len(outputs) { + result.Status = http.StatusNotFound return &result } - marshaledOutput.Name = output.Name - marshaledOutput.Type = output.Type - marshaledOutput.Parameters = params - marshaledOutput.Enable = output.Enable + output.Updatetime = time.Now() + outputs = append(append(outputs[:index], outputs[index+1:]...), output) - err = db.Save(&marshaledOutput).Error + err = updateFluentbitOutputConfigMap(outputs) if err != nil { result.Status = http.StatusInternalServerError return &result } - // 2. Keep CRD in inline with DB - err = syncFluentbitCRDOutputWithDB(db) + // 2. Keep CRD in inline with ConfigMap + err = syncFluentbitCRDOutputWithConfigMap(outputs) if err != nil { - result.Status = http.StatusBadRequest + result.Status = http.StatusInternalServerError return &result } @@ -341,19 +336,35 @@ func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsR func FluentbitOutputDelete(id string) *FluentbitOutputsResult { var result FluentbitOutputsResult - // 1. Remove the record from DB - db := mysql.Client() + // 1. Update ConfigMap + // If the ConfigMap doesn't exist, a new one will be created + outputs, _ := GetFluentbitOutputFromConfigMap() + + index := 0 + for _, output := range outputs { + if output.Id == id { + break + } + index++ + } + + if index >= len(outputs) { + result.Status = http.StatusNotFound + return &result + } + + outputs = append(outputs[:index], outputs[index+1:]...) - err := db.Where("id = ?", id).Delete(&OutputDBBinding{}).Error + err := updateFluentbitOutputConfigMap(outputs) if err != nil { result.Status = http.StatusInternalServerError return &result } // 2. Keep CRD in inline with DB - err = syncFluentbitCRDOutputWithDB(db) + err = syncFluentbitCRDOutputWithConfigMap(outputs) if err != nil { - result.Status = http.StatusBadRequest + result.Status = http.StatusInternalServerError return &result } @@ -361,29 +372,89 @@ func FluentbitOutputDelete(id string) *FluentbitOutputsResult { return &result } -func syncFluentbitCRDOutputWithDB(db *gorm.DB) error { - var outputs []OutputDBBinding +func GetFluentbitOutputFromConfigMap() ([]fb.OutputPlugin, error) { + configMap, err := informers.SharedInformerFactory().Core().V1().ConfigMaps().Lister().ConfigMaps(LoggingNamespace).Get(ConfigMapName) + if err != nil { + return nil, err + } + + data := configMap.Data[ConfigMapData] + + var outputs []fb.OutputPlugin + if err = jsonIter.UnmarshalFromString(data, &outputs); err != nil { + return nil, err + } + + return outputs, nil +} + +func updateFluentbitOutputConfigMap(outputs []fb.OutputPlugin) error { - err := db.Where("enable is true").Find(&outputs).Error + var data string + data, err := jsonIter.MarshalToString(outputs) if err != nil { + glog.Errorln(err) return err } - var unmarshaledOutputs []fb.Plugin + // Update the ConfigMap + config, err := rest.InClusterConfig() + if err != nil { + glog.Errorln(err) + return err + } - for _, output := range outputs { - var params []fb.Parameter + // Creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + glog.Errorln(err) + return err + } - err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms) + configMapClient := clientset.CoreV1().ConfigMaps(LoggingNamespace) + + configMap, err := configMapClient.Get(ConfigMapName, metav1.GetOptions{}) + if err != nil { + + // If the ConfigMap doesn't exist, create a new one + newConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: ConfigMapName, + }, + Data: map[string]string{ConfigMapData: data}, + } + + _, err = configMapClient.Create(newConfigMap) + if err != nil { + glog.Errorln(err) + return err + } + } else { + + // update + configMap.Data = map[string]string{ConfigMapData: data} + _, err = configMapClient.Update(configMap) if err != nil { + glog.Errorln(err) return err } + } + + return nil +} - unmarshaledOutputs = append(unmarshaledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params}) +func syncFluentbitCRDOutputWithConfigMap(outputs []fb.OutputPlugin) error { + + var enabledOutputs []fb.Plugin + for _, output := range outputs { + if output.Enable { + enabledOutputs = append(enabledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: output.Parameters}) + } } + // Empty output is not allowed, must specify a null-type output - if len(unmarshaledOutputs) == 0 { - unmarshaledOutputs = []fb.Plugin{ + if len(enabledOutputs) == 0 { + enabledOutputs = []fb.Plugin{ { Type: "fluentbit_output", Name: "fluentbit-output-null", @@ -407,14 +478,14 @@ func syncFluentbitCRDOutputWithDB(db *gorm.DB) error { } // Create a CRD client interface - crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") + crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace) fluentbit, err := crdclient.Get("fluent-bit") if err != nil { return err } - fluentbit.Spec.Output = unmarshaledOutputs + fluentbit.Spec.Output = enabledOutputs _, err = crdclient.Update("fluent-bit", fluentbit) if err != nil { return err diff --git a/pkg/models/log/types.go b/pkg/models/log/types.go index 072b3b13e4346050018af9f5df1aabb2f43857d9..c2356d9866fa6684726de62445cfc8845577f5eb 100644 --- a/pkg/models/log/types.go +++ b/pkg/models/log/types.go @@ -20,7 +20,6 @@ package log import ( fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" - "time" ) type FluentbitCRDResult struct { @@ -52,13 +51,3 @@ type FluentbitOutputsResult struct { Status int `json:"status"` Outputs []fb.OutputPlugin `json:"outputs,omitempty"` } - -type OutputDBBinding struct { - Id uint `gorm:"primary_key;auto_increment;unique"` - Type string `gorm:"not null"` - Name string `gorm:"not null"` - Parameters string `gorm:"not null"` - Internal bool - Enable bool `gorm:"not null"` - Updatetime time.Time `gorm:"not null"` -} diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go index b82f1db98edf6ff84eb9bcbc805073b6586fd5fa..25baefc12beec7c4de450714917028a6ace69991 100644 --- a/pkg/simple/client/elasticsearch/esclient.go +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -16,6 +16,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/golang/glog" "io/ioutil" "net/http" "strconv" @@ -608,7 +609,7 @@ func Query(param QueryParameters) *QueryResult { url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index) request, err := http.NewRequest("GET", url, bytes.NewBuffer(query)) if err != nil { - //fmt.Println("Create request error ", err.Error()) + glog.Errorln(err) queryResult = new(QueryResult) queryResult.Status = http.StatusNotFound return queryResult @@ -617,7 +618,7 @@ func Query(param QueryParameters) *QueryResult { response, err := client.Do(request) if err != nil { - //fmt.Println("Send request error ", err.Error()) + glog.Errorln(err) queryResult = new(QueryResult) queryResult.Status = http.StatusNotFound return queryResult @@ -626,7 +627,7 @@ func Query(param QueryParameters) *QueryResult { body, err := ioutil.ReadAll(response.Body) if err != nil { - //fmt.Println("Read response error ", err.Error()) + glog.Errorln(err) queryResult = new(QueryResult) queryResult.Status = http.StatusNotFound return queryResult diff --git a/pkg/simple/client/fluentbit/fluentbitcrdclient.go b/pkg/simple/client/fluentbit/fluentbitcrdclient.go index 4f3df73c3bca3f7a9ab5ac558f8530038c07babe..86cc6aa4150fd1d7c52f94a160ea1fdfc0c11f0f 100644 --- a/pkg/simple/client/fluentbit/fluentbitcrdclient.go +++ b/pkg/simple/client/fluentbit/fluentbitcrdclient.go @@ -72,9 +72,9 @@ type Plugin struct { // Fluent-bit output plugins type OutputPlugin struct { Plugin - Id uint `json:"id"` + Id string `json:"id"` Enable bool `json:"enable"` - Updatetime time.Time `json:"updatetime"` + Updatetime time.Time `json:"updatetime,omitempty"` } // Parameter generic parameter type to handle values from different sources