未验证 提交 3cee5d49 编写于 作者: R richardxz 提交者: GitHub

Merge pull request #92 from richardxz/master

add namespace's resource usage Statistics and fix some bugs
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -17,13 +14,15 @@ limitations under the License.
package client
import (
"github.com/jinzhu/gorm"
//_ "github.com/jinzhu/gorm/dialects/mysql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
"log"
"kubesphere.io/kubesphere/pkg/logs"
"kubesphere.io/kubesphere/pkg/options"
)
......@@ -34,29 +33,40 @@ const database = "kubesphere"
func NewDBClient() *gorm.DB {
if dbClient != nil {
return dbClient
err := dbClient.DB().Ping()
if err == nil {
return dbClient
} else {
glog.Error(err)
panic(err)
}
}
user := options.ServerOptions.GetMysqlUser()
passwd := options.ServerOptions.GetMysqlPassword()
addr := options.ServerOptions.GetMysqlAddr()
conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr)
db, err := gorm.Open("mysql", conn)
if dbClient == nil {
conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr)
db, err := gorm.Open("mysql", conn)
if err != nil {
glog.Error(err)
panic(err)
}
if err != nil {
glog.Error(err)
panic(err)
}
db.Exec(fmt.Sprintf("create database if not exists %s;", database))
db.Exec(fmt.Sprintf("create database if not exists %s;", database))
db.Close()
}
conn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database)
db, err = gorm.Open("mysql", conn)
conn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database)
db, err := gorm.Open("mysql", conn)
if err != nil {
glog.Error(err)
panic(err)
}
db.SetLogger(log.New(logs.GlogWriter{}, " ", 0))
dbClient = db
return dbClient
}
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -19,43 +16,18 @@ package client
import (
"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"kubesphere.io/kubesphere/pkg/options"
)
var k8sClient *kubernetes.Clientset
func getKubeConfig() (kubeConfig *rest.Config, err error) {
kubeConfigFile := options.ServerOptions.GetKubeConfigFile()
if len(kubeConfigFile) > 0 {
kubeConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
return nil, err
}
} else {
kubeConfig, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
return kubeConfig, nil
}
func NewK8sClient() *kubernetes.Clientset {
if k8sClient != nil {
return k8sClient
}
kubeConfig, err := getKubeConfig()
kubeConfig, err := options.ServerOptions.GetKubeConfig()
if err != nil {
glog.Error(err)
panic(err)
......
......@@ -17,17 +17,17 @@ limitations under the License.
package controllers
import (
"encoding/json"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/rbac/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole {
func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
name := item.Name
if strings.HasPrefix(name, "system:") {
return nil
......@@ -38,9 +38,7 @@ func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole {
createTime = time.Now()
}
annotation, _ := json.Marshal(item.Annotations)
object := &ClusterRole{Name: name, CreateTime: createTime, AnnotationStr: string(annotation)}
object := &ClusterRole{Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
return object
}
......@@ -64,47 +62,48 @@ func (ctl *ClusterRoleCtl) listAndWatch() {
db = db.CreateTable(&ClusterRole{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer()
lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister()
clusterRoleList, err := k8sClient.RbacV1().ClusterRoles().List(metaV1.ListOptions{})
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range clusterRoleList.Items {
obj := ctl.generateObjec(item)
if obj != nil {
db.Create(obj)
}
}
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
clusterRoleWatcher, err := k8sClient.RbacV1().ClusterRoles().Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-clusterRoleWatcher.ResultChan():
var role ClusterRole
if event.Object == nil {
panic("watch timeout, restart clusterRole controller")
}
object := event.Object.(*v1.ClusterRole)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, "\"\"").Find(&role)
db.Delete(role)
break
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.ClusterRole)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Create(mysqlObject)
}
obj := ctl.generateObjec(*object)
if obj != nil {
db.Save(obj)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.ClusterRole)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Save(mysqlObject)
}
}
}
},
DeleteFunc: func(obj interface{}) {
var item ClusterRole
object := obj.(*v1.ClusterRole)
db.Where("name=?", object.Name).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int {
......@@ -123,12 +122,6 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging)
listWithConditions(db, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import "github.com/jinzhu/gorm"
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
if len(conditions) == 0 {
db.Model(object).Count(total)
} else {
db.Model(object).Where(conditions).Count(total)
}
if paging != nil {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
} else {
db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
}
} else {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Find(list)
} else {
db.Order(order).Find(list)
}
}
}
func countWithConditions(db *gorm.DB, conditions string, object interface{}) int {
var count int
if len(conditions) == 0 {
db.Model(object).Count(&count)
} else {
db.Model(object).Where(conditions).Count(&count)
}
return count
}
......@@ -21,14 +21,13 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/api/apps/v1beta2"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset {
func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset {
var app string
var status string
name := item.Name
......@@ -53,24 +52,20 @@ func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset {
}
if availablePodNum >= desirePodNum {
status = running
status = Running
} else {
status = updating
status = Updating
}
annotation, _ := json.Marshal(item.Annotations)
object := &Daemonset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), AnnotationStr: string(annotation)}
App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *DaemonsetCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -86,43 +81,45 @@ func (ctl *DaemonsetCtl) listAndWatch() {
db = db.CreateTable(&Daemonset{})
k8sClient := client.NewK8sClient()
deoloyList, err := k8sClient.AppsV1beta2().DaemonSets("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer()
lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range deoloyList.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.AppsV1beta2().DaemonSets("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var ss Daemonset
if event.Object == nil {
panic("watch timeout, restart daemonset controller")
}
object := event.Object.(*v1beta2.DaemonSet)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ss)
db.Delete(ss)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.DaemonSet)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.DaemonSet)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Daemonset
object := obj.(*v1.DaemonSet)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int {
......@@ -140,12 +137,6 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) (
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -17,16 +17,17 @@ limitations under the License.
package controllers
import (
"encoding/json"
"time"
"github.com/golang/glog"
"k8s.io/api/apps/v1beta2"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/api/apps/v1"
"k8s.io/client-go/informers"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment {
func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment {
var app string
var status string
var updateTime time.Time
......@@ -53,19 +54,17 @@ func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment {
}
if item.Annotations["state"] == "stop" {
status = stopping
status = Stopped
} else {
if availablePodNum >= desirePodNum {
status = running
status = Running
} else {
status = updating
status = Updating
}
}
annotation, _ := json.Marshal(item.Annotations)
return &Deployment{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, UpdateTime: updateTime, Status: status, AnnotationStr: string(annotation)}
App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}}
}
func (ctl *DeploymentCtl) listAndWatch() {
......@@ -85,45 +84,44 @@ func (ctl *DeploymentCtl) listAndWatch() {
db = db.CreateTable(&Deployment{})
k8sClient := ctl.K8sClient
deoloyList, err := k8sClient.AppsV1beta2().Deployments("").List(metaV1.ListOptions{})
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().Deployments().Informer()
lister := kubeInformerFactory.Apps().V1().Deployments().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range deoloyList.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.AppsV1beta2().Deployments("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
glog.Error("here")
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.Deployment)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Deployment)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var deploy Deployment
if event.Object == nil {
panic("watch timeout, restart deployment controller")
}
object := event.Object.(*v1beta2.Deployment)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy)
db.Delete(deploy)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
object := obj.(*v1.Deployment)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy)
db.Delete(deploy)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *DeploymentCtl) CountWithConditions(conditions string) int {
......@@ -141,12 +139,6 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging)
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -17,16 +17,14 @@ limitations under the License.
package controllers
import (
"encoding/json"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/extensions/v1beta1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress {
......@@ -49,15 +47,14 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress {
ip = strings.Join(ipList, ",")
}
annotation, _ := json.Marshal(item.Annotations)
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, AnnotationStr: string(annotation)}
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *IngressCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -73,43 +70,45 @@ func (ctl *IngressCtl) listAndWatch() {
db = db.CreateTable(&Ingress{})
k8sClient := client.NewK8sClient()
list, err := k8sClient.ExtensionsV1beta1().Ingresses("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer()
lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range list.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.ExtensionsV1beta1().Ingresses("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var ing Ingress
if event.Object == nil {
panic("watch timeout, restart ingress controller")
}
object := event.Object.(*v1beta1.Ingress)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ing)
db.Delete(ing)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1beta1.Ingress)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1beta1.Ingress)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Ingress
object := obj.(*v1beta1.Ingress)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *IngressCtl) CountWithConditions(conditions string) int {
......@@ -127,12 +126,6 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -27,8 +27,11 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/resource"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/options"
......@@ -229,15 +232,14 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace {
createTime = time.Now()
}
annotation, _ := json.Marshal(item.Annotations)
object := &Namespace{Name: name, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)}
object := &Namespace{Name: name, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *NamespaceCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -252,50 +254,45 @@ func (ctl *NamespaceCtl) listAndWatch() {
db = db.CreateTable(&Namespace{})
k8sClient := client.NewK8sClient()
list, err := k8sClient.CoreV1().Namespaces().List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Namespaces().Informer()
lister := kubeInformerFactory.Core().V1().Namespaces().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range list.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
ctl.createRoleAndRuntime(item)
}
watcher, err := k8sClient.CoreV1().Namespaces().Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var ns Namespace
if event.Object == nil {
panic("watch timeout, restart namespace controller")
}
object := event.Object.(*v1.Namespace)
if event.Type == watch.Deleted {
db.Where("name=?", object.Name).Find(&ns)
db.Delete(ns)
ctl.deleteOpRuntime(*object)
break
}
ctl.createRoleAndRuntime(*object)
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.Namespace)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Namespace)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Namespace
object := obj.(*v1.Namespace)
db.Where("name=?", object.Name).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *NamespaceCtl) CountWithConditions(conditions string) int {
......@@ -313,11 +310,12 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) (
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
for index := range list {
usage, err := ctl.GetNamespaceQuota(list[index].Name)
if err == nil {
list[index].Usaeg = usage
}
}
return total, list, nil
}
......@@ -328,3 +326,28 @@ func (ctl *NamespaceCtl) Count(namespace string) int {
db.Model(&Namespace{}).Count(&count)
return count
}
func getUsage(namespace, resource string) int {
ctl := rec.controllers[resource]
return ctl.Count(namespace)
}
func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) {
usage := make(v1.ResourceList)
resourceList := []string{Daemonsets, Deployments, Ingresses, Roles, Services, Statefulsets, PersistentVolumeClaim, Pods}
for _, resourceName := range resourceList {
used := getUsage(namespace, resourceName)
var quantity resource.Quantity
quantity.Set(int64(used))
usage[v1.ResourceName(resourceName)] = quantity
}
podCtl := rec.controllers[Pods]
var quantity resource.Quantity
used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace))
quantity.Set(int64(used))
usage["runningPods"] = quantity
return usage, nil
}
......@@ -18,15 +18,75 @@ package controllers
import (
"encoding/json"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
const inUse = "in_use_pods"
func (ctl *PodCtl) addAnnotationToPvc(item v1.Pod) {
volumes := item.Spec.Volumes
for _, volume := range volumes {
pvc := volume.PersistentVolumeClaim
if pvc != nil {
name := pvc.ClaimName
Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{})
if Pvc.Annotations == nil {
Pvc.Annotations = make(map[string]string)
}
annotation := Pvc.Annotations
if len(annotation[inUse]) == 0 {
pods := []string{item.Name}
str, _ := json.Marshal(pods)
annotation[inUse] = string(str)
} else {
var pods []string
json.Unmarshal([]byte(annotation[inUse]), pods)
for _, pod := range pods {
if pod == item.Name {
return
}
pods = append(pods, item.Name)
str, _ := json.Marshal(pods)
annotation[inUse] = string(str)
}
}
ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc)
}
}
}
func (ctl *PodCtl) delAnnotationFromPvc(item v1.Pod) {
volumes := item.Spec.Volumes
for _, volume := range volumes {
pvc := volume.PersistentVolumeClaim
if pvc != nil {
name := pvc.ClaimName
Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{})
annotation := Pvc.Annotations
var pods []string
json.Unmarshal([]byte(annotation[inUse]), pods)
for index, pod := range pods {
if pod == item.Name {
pods = append(pods[:index], pods[index+1:]...)
}
}
str, _ := json.Marshal(pods)
annotation[inUse] = string(str)
ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc)
}
}
}
func (ctl *PodCtl) generateObject(item v1.Pod) *Pod {
name := item.Name
namespace := item.Namespace
......@@ -37,13 +97,15 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod {
createTime := item.CreationTimestamp.Time
containerStatus := item.Status.ContainerStatuses
containerSpecs := item.Spec.Containers
var containers []Container
var containers Containers
for _, containerSpec := range containerSpecs {
var container Container
container.Name = containerSpec.Name
container.Image = containerSpec.Image
container.Ports = containerSpec.Ports
container.Resources = containerSpec.Resources
for _, status := range containerStatus {
if container.Name == status.Name {
container.Ready = status.Ready
......@@ -53,25 +115,13 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod {
containers = append(containers, container)
}
containerStr, _ := json.Marshal(containers)
annotation, _ := json.Marshal(item.Annotations)
object := &Pod{Namespace: namespace, Name: name, Node: nodeName, PodIp: podIp, Status: status, NodeIp: nodeIp,
CreateTime: createTime, ContainerStr: string(containerStr), AnnotationStr: string(annotation)}
CreateTime: createTime, Annotation: Annotation{item.Annotations}, Containers: containers}
return object
}
func (ctl *PodCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
db := ctl.DB
if db.HasTable(&Pod{}) {
......@@ -81,43 +131,47 @@ func (ctl *PodCtl) listAndWatch() {
db = db.CreateTable(&Pod{})
k8sClient := client.NewK8sClient()
list, err := k8sClient.CoreV1().Pods("").List(meta_v1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Pods().Informer()
lister := kubeInformerFactory.Core().V1().Pods().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
panic(err)
}
for _, item := range list.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.CoreV1().Pods("").Watch(meta_v1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var po Pod
if event.Object == nil {
panic("watch timeout, restart pod controller")
}
object := event.Object.(*v1.Pod)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&po)
db.Delete(po)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.Pod)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
ctl.addAnnotationToPvc(*object)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Pod)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Pod
object := obj.(*v1.Pod)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
ctl.delAnnotationFromPvc(*object)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *PodCtl) CountWithConditions(conditions string) int {
......@@ -135,17 +189,6 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
var containers []Container
json.Unmarshal([]byte(item.ContainerStr), &containers)
list[index].Containers = containers
list[index].ContainerStr = ""
annotation := make(Annotation)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -24,10 +24,9 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
const creator = "creator"
......@@ -60,17 +59,16 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc {
}
accessModeStr = strings.Join(accessModeList, ",")
annotation, _ := json.Marshal(item.Annotations)
object := &Pvc{Namespace: namespace, Name: name, Status: status, Capacity: capacity,
AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, AnnotationStr: string(annotation)}
AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *PvcCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -86,43 +84,44 @@ func (ctl *PvcCtl) listAndWatch() {
db = db.CreateTable(&Pvc{})
k8sClient := client.NewK8sClient()
pvcList, err := k8sClient.CoreV1().PersistentVolumeClaims("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range pvcList.Items {
obj := ctl.generateObject(&item)
for _, item := range list {
obj := ctl.generateObject(item)
db.Create(obj)
}
watcher, err := k8sClient.CoreV1().PersistentVolumeClaims("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var pvc Pvc
if event.Object == nil {
panic("watch timeout, restart pvc controller")
}
object := event.Object.(*v1.PersistentVolumeClaim)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&pvc)
db.Delete(pvc)
break
}
obj := ctl.generateObject(object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.PersistentVolumeClaim)
mysqlObject := ctl.generateObject(object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.PersistentVolumeClaim)
mysqlObject := ctl.generateObject(object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Pvc
object := obj.(*v1.PersistentVolumeClaim)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *PvcCtl) CountWithConditions(conditions string) int {
......@@ -140,11 +139,17 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
for index := range list {
inUsePods := list[index].Annotation.Values[inUse]
var pods []string
json.Unmarshal([]byte(inUsePods), &pods)
if len(pods) > 0 {
list[index].InUse = true
} else {
list[index].InUse = false
}
}
return total, list, nil
......
......@@ -17,16 +17,14 @@ limitations under the License.
package controllers
import (
"encoding/json"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/rbac/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
......@@ -40,9 +38,7 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
createTime = time.Now()
}
annotation, _ := json.Marshal(item.Annotations)
object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, AnnotationStr: string(annotation)}
object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
return object
}
......@@ -65,49 +61,49 @@ func (ctl *RoleCtl) listAndWatch() {
db = db.CreateTable(&Role{})
k8sClient := client.NewK8sClient()
roleList, err := k8sClient.RbacV1().Roles("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Rbac().V1().Roles().Informer()
lister := kubeInformerFactory.Rbac().V1().Roles().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range roleList.Items {
obj := ctl.generateObject(item)
if obj != nil {
db.Create(obj)
}
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
roleWatcher, err := k8sClient.RbacV1().Roles("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
for {
select {
case <-ctl.stopChan:
return
case event := <-roleWatcher.ResultChan():
var role Role
if event.Object == nil {
panic("watch timeout, restart role controller")
}
object := event.Object.(*v1.Role)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&role)
db.Delete(role)
break
object := obj.(*v1.Role)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Create(mysqlObject)
}
obj := ctl.generateObject(*object)
if obj != nil {
db.Save(obj)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Role)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Save(mysqlObject)
}
break
}
}
},
DeleteFunc: func(obj interface{}) {
var item Role
object := obj.(*v1.Role)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *RoleCtl) CountWithConditions(conditions string) int {
......@@ -125,12 +121,6 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int,
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -28,15 +28,15 @@ import (
type resourceControllers struct {
controllers map[string]Controller
db *gorm.DB
k8sClient *kubernetes.Clientset
}
var stopChan chan struct{}
var rec resourceControllers
func (rec *resourceControllers) runContoller(name string) {
var ctl Controller
attr := CommonAttribute{DB: rec.db, K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})}
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})}
switch name {
case Deployments:
ctl = &DeploymentCtl{attr}
......@@ -69,21 +69,39 @@ func (rec *resourceControllers) runContoller(name string) {
}
func dbHealthCheck(db *gorm.DB) {
for {
count := 0
var err error
for k := 0; k < 5; k++ {
err = db.DB().Ping()
if err != nil {
count++
}
time.Sleep(1 * time.Second)
}
if count > 3 {
panic(err)
}
}
}
func Run() {
db := client.NewDBClient()
stopChan := make(chan struct{})
defer db.Commit()
defer db.Close()
defer close(stopChan)
rec := resourceControllers{k8sClient: client.NewK8sClient(), db: db, controllers: make(map[string]Controller)}
rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)}
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} {
rec.runContoller(item)
}
go dbHealthCheck(client.NewDBClient())
for {
for ctlName, controller := range rec.controllers {
select {
......
......@@ -17,18 +17,16 @@ limitations under the License.
package controllers
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
const (
......@@ -119,16 +117,15 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
ports = ports[0 : len(ports)-1]
}
annotation, _ := json.Marshal(item.Annotations)
object := &Service{Namespace: namespace, Name: name, ServiceType: serviceType, ExternalIp: externalIp,
VirtualIp: vip, CreateTime: createTime, Ports: ports, AnnotationStr: string(annotation)}
VirtualIp: vip, CreateTime: createTime, Ports: ports, Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *ServiceCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -143,45 +140,45 @@ func (ctl *ServiceCtl) listAndWatch() {
db = db.CreateTable(&Service{})
k8sClient := client.NewK8sClient()
svcList, err := k8sClient.CoreV1().Services("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Services().Informer()
lister := kubeInformerFactory.Core().V1().Services().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range svcList.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.CoreV1().Services("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var svc Service
if event.Object == nil {
panic("watch timeout, restart service controller")
}
object := event.Object.(*v1.Service)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&svc)
db.Delete(svc)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.Service)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Service)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Service
object := obj.(*v1.Service)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *ServiceCtl) CountWithConditions(conditions string) int {
......@@ -199,12 +196,6 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -17,18 +17,17 @@ limitations under the License.
package controllers
import (
"encoding/json"
"time"
"github.com/golang/glog"
"k8s.io/api/apps/v1beta2"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset {
func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset {
var app string
var status string
name := item.Name
......@@ -50,26 +49,24 @@ func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset
}
if item.Annotations["state"] == "stop" {
status = stopping
status = Stopped
} else {
if availablePodNum >= desirePodNum {
status = running
status = Running
} else {
status = updating
status = Updating
}
}
annotation, _ := json.Marshal(item.Annotations)
statefulSetObject := &Statefulset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)}
App: app, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}}
return statefulSetObject
}
func (ctl *StatefulsetCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -82,42 +79,45 @@ func (ctl *StatefulsetCtl) listAndWatch() {
}
db = db.CreateTable(&Statefulset{})
k8sClient := client.NewK8sClient()
deoloyList, err := k8sClient.AppsV1beta2().StatefulSets("").List(metaV1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer()
lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range deoloyList.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.AppsV1beta2().StatefulSets("").Watch(metaV1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var tmp Statefulset
if event.Object == nil {
panic("watch timeout, restart statefulset controller")
}
object := event.Object.(*v1beta2.StatefulSet)
if event.Type == watch.Deleted {
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&tmp)
db.Delete(tmp)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.StatefulSet)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.StatefulSet)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item Statefulset
object := obj.(*v1.StatefulSet)
db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int {
......@@ -135,12 +135,6 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging)
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index, item := range list {
annotation := make(map[string]string)
json.Unmarshal([]byte(item.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
}
return total, list, nil
}
......
......@@ -17,19 +17,18 @@ limitations under the License.
package controllers
import (
"encoding/json"
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/api/storage/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/api/storage/v1"
"kubesphere.io/kubesphere/pkg/client"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageClass {
func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass {
name := item.Name
createTime := item.CreationTimestamp.Time
......@@ -42,15 +41,14 @@ func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageCl
createTime = time.Now()
}
annotation, _ := json.Marshal(item.Annotations)
object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, AnnotationStr: string(annotation)}
object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, Annotation: Annotation{item.Annotations}}
return object
}
func (ctl *StorageClassCtl) listAndWatch() {
defer func() {
defer close(ctl.aliveChan)
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
......@@ -65,43 +63,46 @@ func (ctl *StorageClassCtl) listAndWatch() {
db = db.CreateTable(&StorageClass{})
k8sClient := client.NewK8sClient()
list, err := k8sClient.StorageV1beta1().StorageClasses().List(meta_v1.ListOptions{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer()
lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister()
list, err := lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
}
for _, item := range list.Items {
obj := ctl.generateObject(item)
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
watcher, err := k8sClient.StorageV1beta1().StorageClasses().Watch(meta_v1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-ctl.stopChan:
return
case event := <-watcher.ResultChan():
var sc StorageClass
if event.Object == nil {
panic("watch timeout, restart storageClass controller")
}
object := event.Object.(*v1beta1.StorageClass)
if event.Type == watch.Deleted {
db.Where("name=?", object.Name).Find(&sc)
db.Delete(sc)
break
}
obj := ctl.generateObject(*object)
db.Save(obj)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.StorageClass)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.StorageClass)
mysqlObject := ctl.generateObject(*object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
var item StorageClass
object := obj.(*v1.StorageClass)
db.Where("name=?", object.Name).Find(&item)
db.Delete(item)
},
})
informer.Run(ctl.stopChan)
}
func (ctl *StorageClassCtl) CountWithConditions(conditions string) int {
......@@ -121,10 +122,6 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging
for index, storageClass := range list {
name := storageClass.Name
annotation := make(map[string]string)
json.Unmarshal([]byte(storageClass.AnnotationStr), &annotation)
list[index].Annotation = annotation
list[index].AnnotationStr = ""
pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}}
list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name))
......
......@@ -19,15 +19,20 @@ package controllers
import (
"time"
"database/sql/driver"
"encoding/json"
"errors"
"github.com/jinzhu/gorm"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
stopping = "stopped"
running = "running"
updating = "updating"
resyncCircle = 180
Stopped = "stopped"
Running = "running"
Updating = "updating"
tablePods = "pods"
tableDeployments = "deployments"
tableDaemonsets = "daemonsets"
......@@ -53,41 +58,37 @@ const (
StorageClasses = "storage-classes"
)
var ResourceTable = map[string]string{Deployments: tableDeployments, Statefulsets: tableStatefulsets, Daemonsets: tableDaemonsets,
Pods: tablePods, Namespaces: tableNamespaces, Ingresses: tableIngresses, PersistentVolumeClaim: tablePersistentVolumeClaim, Roles: tableRoles,
Services: tableServices, StorageClasses: tableStorageClasses, ClusterRoles: tableClusterRoles}
type Annotation map[string]string
//
//func (annotation *Annotation)Scan(val interface{}) error{
// switch val := val.(type){
// case string:
// return json.Unmarshal([]byte(val), annotation)
// case []byte:
// return json.Unmarshal(val, annotation)
// default:
// return errors.New("not support")
// }
// return nil
//}
//
//func (annotation *Annotation)Value() (driver.Value, error){
// bytes, err := json.Marshal(annotation)
// return string(bytes), err
//}
type Annotation struct {
Values map[string]string `gorm:"type:TEXT"`
}
func (annotation *Annotation) Scan(val interface{}) error {
switch val := val.(type) {
case string:
return json.Unmarshal([]byte(val), annotation)
case []byte:
return json.Unmarshal(val, annotation)
default:
return errors.New("not support")
}
return nil
}
func (annotation Annotation) Value() (driver.Value, error) {
bytes, err := json.Marshal(annotation)
return string(bytes), err
}
type Deployment struct {
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
App string `json:"app,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
Annotation Annotation `json:"annotations"`
UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"`
}
func (Deployment) TableName() string {
......@@ -99,12 +100,11 @@ type Statefulset struct {
Namespace string `gorm:"primary_key" json:"namespace,omitempty"`
App string `json:"app,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
func (Statefulset) TableName() string {
......@@ -116,13 +116,12 @@ type Daemonset struct {
Namespace string `gorm:"primary_key" json:"namespace,omitempty"`
App string `json:"app,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
NodeSelector string `json:"nodeSelector, omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Status string `json:"status"`
NodeSelector string `json:"nodeSelector, omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
func (Daemonset) TableName() string {
......@@ -137,10 +136,9 @@ type Service struct {
VirtualIp string `json:"virtualIp,omitempty"`
ExternalIp string `json:"externalIp,omitempty"`
Ports string `json:"ports,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Ports string `json:"ports,omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
func (Service) TableName() string {
......@@ -153,10 +151,10 @@ type Pvc struct {
Status string `json:"status,omitempty"`
Capacity string `json:"capacity,omitempty"`
AccessMode string `json:"accessMode,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
StorageClassName string `gorm:"column:storage_class" json:"storage_class,omitempty"`
InUse bool `gorm:"-" json:"inUse"`
}
func (Pvc) TableName() string {
......@@ -168,8 +166,7 @@ type Ingress struct {
Namespace string `gorm:"primary_key" json:"namespace"`
Ip string `json:"ip,omitempty"`
TlsTermination string `json:"tlsTermination,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
......@@ -178,24 +175,41 @@ func (Ingress) TableName() string {
}
type Pod struct {
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
Status string `json:"status,omitempty"`
Node string `json:"node,omitempty"`
NodeIp string `json:"nodeIp,omitempty"`
PodIp string `json:"podIp,omitempty"`
ContainerStr string `gorm:"type:text" json:",omitempty"`
Containers []Container `json:"containers,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
Status string `json:"status,omitempty"`
Node string `json:"node,omitempty"`
NodeIp string `json:"nodeIp,omitempty"`
PodIp string `json:"podIp,omitempty"`
Containers Containers `gorm:"type:text" json:"containers,omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
type Container struct {
Name string `json:"name"`
Ready bool `json:"ready,omitempty"`
Image string `json:"image"`
Ports []v1.ContainerPort `json:"ports"`
Name string `json:"name"`
Ready bool `json:"ready,omitempty"`
Image string `json:"image"`
Resources v1.ResourceRequirements `json:"resources"`
Ports []v1.ContainerPort `json:"ports"`
}
type Containers []Container
func (containers *Containers) Scan(val interface{}) error {
switch val := val.(type) {
case string:
return json.Unmarshal([]byte(val), containers)
case []byte:
return json.Unmarshal(val, containers)
default:
return errors.New("not support")
}
return nil
}
func (containers Containers) Value() (driver.Value, error) {
bytes, err := json.Marshal(containers)
return string(bytes), err
}
func (Pod) TableName() string {
......@@ -203,11 +217,10 @@ func (Pod) TableName() string {
}
type Role struct {
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
func (Role) TableName() string {
......@@ -215,10 +228,9 @@ func (Role) TableName() string {
}
type ClusterRole struct {
Name string `gorm:"primary_key" json:"name"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Name string `gorm:"primary_key" json:"name"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
}
func (ClusterRole) TableName() string {
......@@ -230,10 +242,10 @@ type Namespace struct {
Creator string `json:"creator,omitempty"`
Status string `json:"status"`
Descrition string `json:"description,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Descrition string `json:"description,omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
Usaeg v1.ResourceList `gorm:"-" json:"usage,omitempty"`
}
func (Namespace) TableName() string {
......@@ -241,13 +253,12 @@ func (Namespace) TableName() string {
}
type StorageClass struct {
Name string `gorm:"primary_key" json:"name"`
Creator string `json:"creator,omitempty"`
AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"`
Annotation Annotation `gorm:"-" json:"annotation"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
IsDefault bool `json:"default"`
Count int `json:"count"`
Name string `gorm:"primary_key" json:"name"`
Creator string `json:"creator,omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
IsDefault bool `json:"default"`
Count int `json:"count"`
}
func (StorageClass) TableName() string {
......@@ -262,7 +273,8 @@ type Controller interface {
listAndWatch()
chanStop() chan struct{}
chanAlive() chan struct{}
Count(conditions string) int
Count(namespace string) int
CountWithConditions(condition string) int
ListWithConditions(condition string, paging *Paging) (int, interface{}, error)
}
......@@ -326,36 +338,3 @@ type RoleCtl struct {
type ClusterRoleCtl struct {
CommonAttribute
}
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
if len(conditions) == 0 {
db.Model(object).Count(total)
} else {
db.Model(object).Where(conditions).Count(total)
}
if paging != nil {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
} else {
db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
}
} else {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Find(list)
} else {
db.Order(order).Find(list)
}
}
}
func countWithConditions(db *gorm.DB, conditions string, object interface{}) int {
var count int
if len(conditions) == 0 {
db.Model(object).Count(&count)
} else {
db.Model(object).Where(conditions).Count(&count)
}
return count
}
......@@ -134,24 +134,31 @@ func generateConditionAndPaging(conditions map[string]string, paging map[string]
}
type workLoadStatus struct {
NameSpace string `json:"namespace"`
Count map[string]int `json:"data"`
Items map[string]interface{}
NameSpace string `json:"namespace"`
Count map[string]int `json:"data"`
Items map[string]interface{} `json:"items,omitempty"`
}
func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) {
res := workLoadStatus{Count: make(map[string]int), NameSpace: namespace, Items: make(map[string]interface{})}
var status *ResourceList
var err error
for _, resource := range []string{controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets} {
status, err := ListResource(resource, "status=updating", "")
if len(namespace) > 0 {
status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", controllers.Updating, namespace), "")
} else {
status, err = ListResource(resource, fmt.Sprintf("status=%s", controllers.Updating), "")
}
if err != nil {
return nil, err
}
count := status.Total
items := status.Items
//items := status.Items
res.Count[resource] = count
res.Items[resource] = items
//res.Items[resource] = items
}
return &res, nil
......
......@@ -11,6 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// the code is mainly from:
// https://github.com/kubernetes/dashboard/blob/master/src/app/backend/handler/terminal.go
// thanks to the related developer
package models
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册