提交 74325fa8 编写于 作者: H huanggze

use configmap to store log output configs

Signed-off-by: Nhuanggze <loganhuang@yunify.com>
上级 8edf1ca7
...@@ -31,9 +31,6 @@ import ( ...@@ -31,9 +31,6 @@ import (
"kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/informers"
logging "kubesphere.io/kubesphere/pkg/models/log" logging "kubesphere.io/kubesphere/pkg/models/log"
"kubesphere.io/kubesphere/pkg/signals" "kubesphere.io/kubesphere/pkg/signals"
es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch"
fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit"
"kubesphere.io/kubesphere/pkg/simple/client/mysql"
"log" "log"
"net/http" "net/http"
) )
...@@ -116,30 +113,18 @@ func initializeKialiConfig(s *options.ServerRunOptions) { ...@@ -116,30 +113,18 @@ func initializeKialiConfig(s *options.ServerRunOptions) {
func initializeESClientConfig() { func initializeESClientConfig() {
var outputs []logging.OutputDBBinding // List all outputs
var configs *es.ESConfigs outputs,err := logging.GetFluentbitOutputFromConfigMap()
db := mysql.Client()
if !db.HasTable(&logging.OutputDBBinding{}) {
// Panic
log.Print("Flyway migration is not completed")
}
err := db.Find(&outputs).Error
if err != nil { if err != nil {
log.Printf("get logging config failed. Error: %v", err) glog.Errorln(err)
return return
} }
// Retrieve es-type output from db // Iterate the outputs to get elasticsearch configs
var params []fb.Parameter
for _, output := range outputs { for _, output := range outputs {
err := jsonIter.UnmarshalFromString(output.Parameters, &params) if configs := logging.ParseEsOutputParams(output.Parameters); configs != nil {
if err == nil { configs.WriteESConfigs()
if configs = logging.ParseEsOutputParams(params); configs != nil { return
configs.WriteESConfigs()
return
}
} }
} }
} }
......
...@@ -214,4 +214,4 @@ func addWebService(c *restful.Container) error { ...@@ -214,4 +214,4 @@ func addWebService(c *restful.Container) error {
c.Add(ws) c.Add(ws)
return nil return nil
} }
\ No newline at end of file
...@@ -91,6 +91,7 @@ func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Re ...@@ -91,6 +91,7 @@ func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Re
err := request.ReadEntity(&output) err := request.ReadEntity(&output)
if err != nil { if err != nil {
glog.Errorln(err)
res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest} res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest}
} else { } else {
res = log.FluentbitOutputInsert(output) res = log.FluentbitOutputInsert(output)
...@@ -104,14 +105,10 @@ func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Re ...@@ -104,14 +105,10 @@ func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Re
var output fb.OutputPlugin var output fb.OutputPlugin
id := request.PathParameter("output") id := request.PathParameter("output")
_, err := strconv.ParseUint(id, 10, 64)
if err != nil { err := request.ReadEntity(&output)
res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest}
response.WriteAsJson(res)
return
}
err = request.ReadEntity(&output)
if err != nil { if err != nil {
glog.Errorln(err)
res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest} res := &log.FluentbitOutputsResult{Status: http.StatusBadRequest}
response.WriteAsJson(res) response.WriteAsJson(res)
return return
...@@ -126,12 +123,7 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re ...@@ -126,12 +123,7 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re
var res *log.FluentbitOutputsResult var res *log.FluentbitOutputsResult
id := request.PathParameter("output") id := request.PathParameter("output")
_, err := strconv.ParseUint(id, 10, 64) res = log.FluentbitOutputDelete(id)
if err != nil {
res = &log.FluentbitOutputsResult{Status: http.StatusBadRequest}
} else {
res = log.FluentbitOutputDelete(id)
}
response.WriteAsJson(res) response.WriteAsJson(res)
} }
......
CREATE TABLE output_db_bindings
(
id INT NOT NULL AUTO_INCREMENT,
type TEXT NOT NULL,
name TEXT NOT NULL,
parameters TEXT NOT NULL,
internal BOOLEAN,
enable BOOLEAN NOT NULL,
updatetime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE (id)
);
INSERT INTO output_db_bindings (type, name, parameters, enable) VALUES ('fluentbit_output', 'fluentbit-output', '[{"name": "Name", "value": "es"}, {"name": "Match", "value": "kube.*"}, {"name": "Host", "value": "elasticsearch-logging-data.kubesphere-logging-system.svc"}, {"name": "Port", "value": "9200"}, {"name": "Logstash_Format", "value": "On"}, {"name": "Replace_Dots", "value": "on"}, {"name": "Retry_Limit", "value": "False"}, {"name": "Type", "value": "flb_type"}, {"name": "Time_Key", "value": "@timestamp"}, {"name": "Logstash_Prefix", "value": "logstash"} ]', '1');
\ No newline at end of file
...@@ -27,4 +27,4 @@ const ( ...@@ -27,4 +27,4 @@ const (
QueryLevelWorkload QueryLevelWorkload
QueryLevelPod QueryLevelPod
QueryLevelContainer QueryLevelContainer
) )
\ No newline at end of file
...@@ -17,22 +17,31 @@ limitations under the License. ...@@ -17,22 +17,31 @@ limitations under the License.
package log package log
import ( import (
"github.com/jinzhu/gorm" _ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/google/uuid"
"github.com/json-iterator/go" "github.com/json-iterator/go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"kubesphere.io/kubesphere/pkg/informers"
es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch"
fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit"
"kubesphere.io/kubesphere/pkg/simple/client/mysql"
"net/http" "net/http"
"strings" "strings"
"time" "time"
_ "github.com/go-sql-driver/mysql"
) )
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
const (
ConfigMapName = "fluent-bit-output-config"
ConfigMapData = "outputs"
LoggingNamespace = "kubesphere-logging-system"
)
func createCRDClientSet() (*rest.RESTClient, *runtime.Scheme, error) { func createCRDClientSet() (*rest.RESTClient, *runtime.Scheme, error) {
config, err := fb.GetClientConfig("") config, err := fb.GetClientConfig("")
if err != nil { if err != nil {
...@@ -84,7 +93,7 @@ func FluentbitFiltersQuery() *FluentbitFiltersResult { ...@@ -84,7 +93,7 @@ func FluentbitFiltersQuery() *FluentbitFiltersResult {
} }
// Create a CRD client interface // Create a CRD client interface
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace)
item, err := crdclient.Get("fluent-bit") item, err := crdclient.Get("fluent-bit")
if err != nil { if err != nil {
...@@ -193,7 +202,7 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult ...@@ -193,7 +202,7 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult
} }
// Create a CRD client interface // Create a CRD client interface
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace)
var item *fb.FluentBit var item *fb.FluentBit
var err_read error var err_read error
...@@ -221,34 +230,13 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult ...@@ -221,34 +230,13 @@ func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult
func FluentbitOutputsQuery() *FluentbitOutputsResult { func FluentbitOutputsQuery() *FluentbitOutputsResult {
var result FluentbitOutputsResult var result FluentbitOutputsResult
// Retrieve outputs from DB outputs, err := GetFluentbitOutputFromConfigMap()
db := mysql.Client()
var outputs []OutputDBBinding
err := db.Find(&outputs).Error
if err != nil { if err != nil {
result.Status = http.StatusInternalServerError result.Status = http.StatusNotFound
return &result return &result
} }
var unmarshaledOutputs []fb.OutputPlugin result.Outputs = outputs
for _, output := range outputs {
var params []fb.Parameter
err = jsonIter.UnmarshalFromString(output.Parameters, &params)
if err != nil {
result.Status = http.StatusInternalServerError
return &result
}
unmarshaledOutputs = append(unmarshaledOutputs,
fb.OutputPlugin{Plugin: fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params},
Id: output.Id, Enable: output.Enable, Updatetime: output.Updatetime})
}
result.Outputs = unmarshaledOutputs
result.Status = http.StatusOK result.Status = http.StatusOK
return &result return &result
...@@ -257,25 +245,29 @@ func FluentbitOutputsQuery() *FluentbitOutputsResult { ...@@ -257,25 +245,29 @@ func FluentbitOutputsQuery() *FluentbitOutputsResult {
func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult {
var result FluentbitOutputsResult var result FluentbitOutputsResult
params, err := jsoniter.MarshalToString(output.Parameters) // 1. Update ConfigMap
if err != nil { var outputs []fb.OutputPlugin
result.Status = http.StatusBadRequest outputs, err := GetFluentbitOutputFromConfigMap()
return &result if err != nil {
// If the ConfigMap doesn't exist, a new one will be created later
glog.Errorln(err)
} }
// 1. Update DB // When adding a new output for the first time, one should always set it disabled
db := mysql.Client() output.Enable = false
output.Id = uuid.New().String()
output.Updatetime = time.Now()
marshaledOutput := OutputDBBinding{Type: output.Type, Name: output.Name, outputs = append(outputs, output)
Parameters: params, Enable: output.Enable, Updatetime: time.Now()}
err = db.Create(&marshaledOutput).Error err = updateFluentbitOutputConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusInternalServerError result.Status = http.StatusInternalServerError
return &result return &result
} }
// 2. Keep CRD in inline with DB // 2. Keep CRD in inline with ConfigMap
err = syncFluentbitCRDOutputWithDB(db) err = syncFluentbitCRDOutputWithConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusInternalServerError result.Status = http.StatusInternalServerError
return &result return &result
...@@ -294,37 +286,40 @@ func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { ...@@ -294,37 +286,40 @@ func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult {
func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsResult { func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsResult {
var result FluentbitOutputsResult var result FluentbitOutputsResult
// 1. Update DB // 1. Update ConfigMap
db := mysql.Client() var outputs []fb.OutputPlugin
outputs, err := GetFluentbitOutputFromConfigMap()
params, err := jsoniter.MarshalToString(output.Parameters)
if err != nil { if err != nil {
result.Status = http.StatusBadRequest // If the ConfigMap doesn't exist, a new one will be created later
return &result glog.Errorln(err)
} }
var marshaledOutput OutputDBBinding index := 0
err = db.Where("id = ?", id).First(&marshaledOutput).Error for _, output := range outputs {
if err != nil { if output.Id == id {
result.Status = http.StatusInternalServerError break
}
index++
}
if index >= len(outputs) {
result.Status = http.StatusNotFound
return &result return &result
} }
marshaledOutput.Name = output.Name output.Updatetime = time.Now()
marshaledOutput.Type = output.Type outputs = append(append(outputs[:index], outputs[index+1:]...), output)
marshaledOutput.Parameters = params
marshaledOutput.Enable = output.Enable
err = db.Save(&marshaledOutput).Error err = updateFluentbitOutputConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusInternalServerError result.Status = http.StatusInternalServerError
return &result return &result
} }
// 2. Keep CRD in inline with DB // 2. Keep CRD in inline with ConfigMap
err = syncFluentbitCRDOutputWithDB(db) err = syncFluentbitCRDOutputWithConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusBadRequest result.Status = http.StatusInternalServerError
return &result return &result
} }
...@@ -341,19 +336,35 @@ func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsR ...@@ -341,19 +336,35 @@ func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsR
func FluentbitOutputDelete(id string) *FluentbitOutputsResult { func FluentbitOutputDelete(id string) *FluentbitOutputsResult {
var result FluentbitOutputsResult var result FluentbitOutputsResult
// 1. Remove the record from DB // 1. Update ConfigMap
db := mysql.Client() // If the ConfigMap doesn't exist, a new one will be created
outputs, _ := GetFluentbitOutputFromConfigMap()
index := 0
for _, output := range outputs {
if output.Id == id {
break
}
index++
}
if index >= len(outputs) {
result.Status = http.StatusNotFound
return &result
}
outputs = append(outputs[:index], outputs[index+1:]...)
err := db.Where("id = ?", id).Delete(&OutputDBBinding{}).Error err := updateFluentbitOutputConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusInternalServerError result.Status = http.StatusInternalServerError
return &result return &result
} }
// 2. Keep CRD in inline with DB // 2. Keep CRD in inline with DB
err = syncFluentbitCRDOutputWithDB(db) err = syncFluentbitCRDOutputWithConfigMap(outputs)
if err != nil { if err != nil {
result.Status = http.StatusBadRequest result.Status = http.StatusInternalServerError
return &result return &result
} }
...@@ -361,29 +372,89 @@ func FluentbitOutputDelete(id string) *FluentbitOutputsResult { ...@@ -361,29 +372,89 @@ func FluentbitOutputDelete(id string) *FluentbitOutputsResult {
return &result return &result
} }
func syncFluentbitCRDOutputWithDB(db *gorm.DB) error { func GetFluentbitOutputFromConfigMap() ([]fb.OutputPlugin, error) {
var outputs []OutputDBBinding configMap, err := informers.SharedInformerFactory().Core().V1().ConfigMaps().Lister().ConfigMaps(LoggingNamespace).Get(ConfigMapName)
if err != nil {
return nil, err
}
data := configMap.Data[ConfigMapData]
var outputs []fb.OutputPlugin
if err = jsonIter.UnmarshalFromString(data, &outputs); err != nil {
return nil, err
}
return outputs, nil
}
func updateFluentbitOutputConfigMap(outputs []fb.OutputPlugin) error {
err := db.Where("enable is true").Find(&outputs).Error var data string
data, err := jsonIter.MarshalToString(outputs)
if err != nil { if err != nil {
glog.Errorln(err)
return err return err
} }
var unmarshaledOutputs []fb.Plugin // Update the ConfigMap
config, err := rest.InClusterConfig()
if err != nil {
glog.Errorln(err)
return err
}
for _, output := range outputs { // Creates the clientset
var params []fb.Parameter clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Errorln(err)
return err
}
err = jsonIter.UnmarshalFromString(output.Parameters, &params) configMapClient := clientset.CoreV1().ConfigMaps(LoggingNamespace)
configMap, err := configMapClient.Get(ConfigMapName, metav1.GetOptions{})
if err != nil {
// If the ConfigMap doesn't exist, create a new one
newConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ConfigMapName,
},
Data: map[string]string{ConfigMapData: data},
}
_, err = configMapClient.Create(newConfigMap)
if err != nil {
glog.Errorln(err)
return err
}
} else {
// update
configMap.Data = map[string]string{ConfigMapData: data}
_, err = configMapClient.Update(configMap)
if err != nil { if err != nil {
glog.Errorln(err)
return err return err
} }
}
return nil
}
unmarshaledOutputs = append(unmarshaledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params}) func syncFluentbitCRDOutputWithConfigMap(outputs []fb.OutputPlugin) error {
var enabledOutputs []fb.Plugin
for _, output := range outputs {
if output.Enable {
enabledOutputs = append(enabledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: output.Parameters})
}
} }
// Empty output is not allowed, must specify a null-type output // Empty output is not allowed, must specify a null-type output
if len(unmarshaledOutputs) == 0 { if len(enabledOutputs) == 0 {
unmarshaledOutputs = []fb.Plugin{ enabledOutputs = []fb.Plugin{
{ {
Type: "fluentbit_output", Type: "fluentbit_output",
Name: "fluentbit-output-null", Name: "fluentbit-output-null",
...@@ -407,14 +478,14 @@ func syncFluentbitCRDOutputWithDB(db *gorm.DB) error { ...@@ -407,14 +478,14 @@ func syncFluentbitCRDOutputWithDB(db *gorm.DB) error {
} }
// Create a CRD client interface // Create a CRD client interface
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system") crdclient := fb.CrdClient(crdcs, scheme, LoggingNamespace)
fluentbit, err := crdclient.Get("fluent-bit") fluentbit, err := crdclient.Get("fluent-bit")
if err != nil { if err != nil {
return err return err
} }
fluentbit.Spec.Output = unmarshaledOutputs fluentbit.Spec.Output = enabledOutputs
_, err = crdclient.Update("fluent-bit", fluentbit) _, err = crdclient.Update("fluent-bit", fluentbit)
if err != nil { if err != nil {
return err return err
......
...@@ -20,7 +20,6 @@ package log ...@@ -20,7 +20,6 @@ package log
import ( import (
fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit"
"time"
) )
type FluentbitCRDResult struct { type FluentbitCRDResult struct {
...@@ -52,13 +51,3 @@ type FluentbitOutputsResult struct { ...@@ -52,13 +51,3 @@ type FluentbitOutputsResult struct {
Status int `json:"status"` Status int `json:"status"`
Outputs []fb.OutputPlugin `json:"outputs,omitempty"` Outputs []fb.OutputPlugin `json:"outputs,omitempty"`
} }
type OutputDBBinding struct {
Id uint `gorm:"primary_key;auto_increment;unique"`
Type string `gorm:"not null"`
Name string `gorm:"not null"`
Parameters string `gorm:"not null"`
Internal bool
Enable bool `gorm:"not null"`
Updatetime time.Time `gorm:"not null"`
}
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/golang/glog"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
...@@ -608,7 +609,7 @@ func Query(param QueryParameters) *QueryResult { ...@@ -608,7 +609,7 @@ func Query(param QueryParameters) *QueryResult {
url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index) url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index)
request, err := http.NewRequest("GET", url, bytes.NewBuffer(query)) request, err := http.NewRequest("GET", url, bytes.NewBuffer(query))
if err != nil { if err != nil {
//fmt.Println("Create request error ", err.Error()) glog.Errorln(err)
queryResult = new(QueryResult) queryResult = new(QueryResult)
queryResult.Status = http.StatusNotFound queryResult.Status = http.StatusNotFound
return queryResult return queryResult
...@@ -617,7 +618,7 @@ func Query(param QueryParameters) *QueryResult { ...@@ -617,7 +618,7 @@ func Query(param QueryParameters) *QueryResult {
response, err := client.Do(request) response, err := client.Do(request)
if err != nil { if err != nil {
//fmt.Println("Send request error ", err.Error()) glog.Errorln(err)
queryResult = new(QueryResult) queryResult = new(QueryResult)
queryResult.Status = http.StatusNotFound queryResult.Status = http.StatusNotFound
return queryResult return queryResult
...@@ -626,7 +627,7 @@ func Query(param QueryParameters) *QueryResult { ...@@ -626,7 +627,7 @@ func Query(param QueryParameters) *QueryResult {
body, err := ioutil.ReadAll(response.Body) body, err := ioutil.ReadAll(response.Body)
if err != nil { if err != nil {
//fmt.Println("Read response error ", err.Error()) glog.Errorln(err)
queryResult = new(QueryResult) queryResult = new(QueryResult)
queryResult.Status = http.StatusNotFound queryResult.Status = http.StatusNotFound
return queryResult return queryResult
......
...@@ -72,9 +72,9 @@ type Plugin struct { ...@@ -72,9 +72,9 @@ type Plugin struct {
// Fluent-bit output plugins // Fluent-bit output plugins
type OutputPlugin struct { type OutputPlugin struct {
Plugin Plugin
Id uint `json:"id"` Id string `json:"id"`
Enable bool `json:"enable"` Enable bool `json:"enable"`
Updatetime time.Time `json:"updatetime"` Updatetime time.Time `json:"updatetime,omitempty"`
} }
// Parameter generic parameter type to handle values from different sources // Parameter generic parameter type to handle values from different sources
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册
新手
引导
客服 返回
顶部