From 67cbff464ff22a1a332a30db78c05f21634d6733 Mon Sep 17 00:00:00 2001 From: Duan Jiong Date: Thu, 25 Feb 2021 18:53:08 +0800 Subject: [PATCH] fix ippool status statistics and delete ippool label while workspace is deleted sync default ippool to namespace annotation Signed-off-by: Duan Jiong --- cmd/controller-manager/app/controllers.go | 8 +- .../network/ippool/ippool_controller.go | 269 ++++++++++++++---- .../network/ippool/ippool_controller_test.go | 17 +- .../client/network/ippool/calico/provider.go | 244 ++++++++-------- pkg/simple/client/network/ippool/ipam/ipam.go | 4 + pkg/simple/client/network/ippool/provider.go | 12 +- 6 files changed, 370 insertions(+), 184 deletions(-) diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index 67d009e7..27e1413b 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -253,13 +253,9 @@ func addControllers( } var ippoolController manager.Runnable - ippoolProvider := ippoolclient.NewProvider(kubernetesInformer.Core().V1().Pods(), client.KubeSphere(), client.Kubernetes(), networkOptions.IPPoolType, options) + ippoolProvider := ippoolclient.NewProvider(kubernetesInformer, client.KubeSphere(), client.Kubernetes(), networkOptions.IPPoolType, options) if ippoolProvider != nil { - ippoolController = ippool.NewIPPoolController(kubesphereInformer.Network().V1alpha1().IPPools(), - kubesphereInformer.Network().V1alpha1().IPAMBlocks(), - client.Kubernetes(), - client.KubeSphere(), - ippoolProvider) + ippoolController = ippool.NewIPPoolController(kubesphereInformer, kubernetesInformer, client.Kubernetes(), client.KubeSphere(), ippoolProvider) } controllers := map[string]manager.Runnable{ diff --git a/pkg/controller/network/ippool/ippool_controller.go b/pkg/controller/network/ippool/ippool_controller.go index bbf1fe7c..24b3a5df 100644 --- a/pkg/controller/network/ippool/ippool_controller.go +++ b/pkg/controller/network/ippool/ippool_controller.go @@ -19,32 +19,36 @@ package ippool import ( "context" "fmt" - "reflect" - "time" - cnet "github.com/projectcalico/libcalico-go/lib/net" - podv1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + k8sinformers "k8s.io/client-go/informers" + coreinfomers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" networkv1alpha1 "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1" + tenantv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1" kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions" networkInformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/network/v1alpha1" + tenantv1alpha1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha1" + "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/controller/network/utils" "kubesphere.io/kubesphere/pkg/controller/network/webhooks" "kubesphere.io/kubesphere/pkg/simple/client/network/ippool" + "reflect" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "time" ) var ( @@ -61,6 +65,13 @@ type IPPoolController struct { ippoolSynced cache.InformerSynced ippoolQueue workqueue.RateLimitingInterface + wsInformer tenantv1alpha1informers.WorkspaceInformer + wsSynced cache.InformerSynced + + nsInformer coreinfomers.NamespaceInformer + nsSynced cache.InformerSynced + nsQueue workqueue.RateLimitingInterface + ipamblockInformer networkInformer.IPAMBlockInformer ipamblockSynced cache.InformerSynced @@ -68,31 +79,25 @@ type IPPoolController struct { kubesphereClient kubesphereclient.Interface } -func (c *IPPoolController) ippoolHandle(obj interface{}) { +func (c *IPPoolController) enqueueIPPools(obj interface{}) { pool, ok := obj.(*networkv1alpha1.IPPool) if !ok { utilruntime.HandleError(fmt.Errorf("IPPool informer returned non-ippool object: %#v", obj)) return } - key, err := cache.MetaNamespaceKeyFunc(pool) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for ippool %#v: %v", pool, err)) - return - } - if utils.NeedToAddFinalizer(pool, networkv1alpha1.IPPoolFinalizer) || utils.IsDeletionCandidate(pool, networkv1alpha1.IPPoolFinalizer) { - c.ippoolQueue.Add(key) - } + c.ippoolQueue.Add(pool.Name) } func (c *IPPoolController) addFinalizer(pool *networkv1alpha1.IPPool) error { clone := pool.DeepCopy() controllerutil.AddFinalizer(clone, networkv1alpha1.IPPoolFinalizer) - clone.Labels = map[string]string{ - networkv1alpha1.IPPoolNameLabel: clone.Name, - networkv1alpha1.IPPoolTypeLabel: clone.Spec.Type, - networkv1alpha1.IPPoolIDLabel: fmt.Sprintf("%d", clone.ID()), + if clone.Labels == nil { + clone.Labels = make(map[string]string) } + clone.Labels[networkv1alpha1.IPPoolNameLabel] = clone.Name + clone.Labels[networkv1alpha1.IPPoolTypeLabel] = clone.Spec.Type + clone.Labels[networkv1alpha1.IPPoolIDLabel] = fmt.Sprintf("%d", clone.ID()) pool, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{}) if err != nil { klog.V(3).Infof("Error adding finalizer to pool %s: %v", pool.Name, err) @@ -116,12 +121,15 @@ func (c *IPPoolController) removeFinalizer(pool *networkv1alpha1.IPPool) error { func (c *IPPoolController) ValidateCreate(obj runtime.Object) error { b := obj.(*networkv1alpha1.IPPool) - _, cidr, err := cnet.ParseCIDR(b.Spec.CIDR) + ip, cidr, err := cnet.ParseCIDR(b.Spec.CIDR) if err != nil { return fmt.Errorf("invalid cidr") } size, _ := cidr.Mask.Size() + if ip.IP.To4() != nil && size == 32 { + return fmt.Errorf("the cidr mask must be less than 32") + } if b.Spec.BlockSize > 0 && b.Spec.BlockSize < size { return fmt.Errorf("the blocksize should be larger than the cidr mask") } @@ -163,6 +171,25 @@ func (c *IPPoolController) ValidateCreate(obj runtime.Object) error { return nil } +func (c *IPPoolController) validateDefaultIPPool(p *networkv1alpha1.IPPool) error { + pools, err := c.kubesphereClient.NetworkV1alpha1().IPPools().List(context.TODO(), metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet( + labels.Set{ + networkv1alpha1.IPPoolDefaultLabel: "", + }).String(), + }) + if err != nil { + return err + } + + poolLen := len(pools.Items) + if poolLen != 1 || pools.Items[0].Name != p.Name { + return nil + } + + return fmt.Errorf("Must ensure that there is at least one default ippool") +} + func (c *IPPoolController) ValidateUpdate(old runtime.Object, new runtime.Object) error { oldP := old.(*networkv1alpha1.IPPool) newP := new.(*networkv1alpha1.IPPool) @@ -183,6 +210,15 @@ func (c *IPPoolController) ValidateUpdate(old runtime.Object, new runtime.Object return fmt.Errorf("ippool rangeEnd/rangeStart cannot be modified") } + _, defaultOld := oldP.Labels[networkv1alpha1.IPPoolDefaultLabel] + _, defaultNew := newP.Labels[networkv1alpha1.IPPoolDefaultLabel] + if !defaultNew && defaultOld != defaultNew { + err := c.validateDefaultIPPool(newP) + if err != nil { + return err + } + } + return nil } @@ -193,7 +229,7 @@ func (c *IPPoolController) ValidateDelete(obj runtime.Object) error { return fmt.Errorf("ippool is in use, please remove the workload before deleting") } - return nil + return c.validateDefaultIPPool(p) } func (c *IPPoolController) disableIPPool(old *networkv1alpha1.IPPool) error { @@ -204,7 +240,7 @@ func (c *IPPoolController) disableIPPool(old *networkv1alpha1.IPPool) error { clone := old.DeepCopy() clone.Spec.Disabled = true - old, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{}) + _, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{}) return err } @@ -305,19 +341,20 @@ func (c *IPPoolController) Run(workers int, stopCh <-chan struct{}) error { klog.Info("starting ippool controller") defer klog.Info("shutting down ippool controller") - if !cache.WaitForCacheSync(stopCh, c.ippoolSynced, c.ipamblockSynced) { + if !cache.WaitForCacheSync(stopCh, c.ippoolSynced, c.ipamblockSynced, c.wsSynced, c.nsSynced) { return fmt.Errorf("failed to wait for caches to sync") } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runIPPoolWorker, time.Second, stopCh) + go wait.Until(c.runNSWorker, time.Second, stopCh) } <-stopCh return nil } -func (c *IPPoolController) runWorker() { +func (c *IPPoolController) runIPPoolWorker() { for c.processIPPoolItem() { } } @@ -329,13 +366,7 @@ func (c *IPPoolController) processIPPoolItem() bool { } defer c.ippoolQueue.Done(key) - _, name, err := cache.SplitMetaNamespaceKey(key.(string)) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error parsing ippool key %q: %v", key, err)) - return true - } - - delay, err := c.processIPPool(name) + delay, err := c.processIPPool(key.(string)) if err == nil { c.ippoolQueue.Forget(key) return true @@ -350,7 +381,63 @@ func (c *IPPoolController) processIPPoolItem() bool { return true } -func (c *IPPoolController) ipamblockHandle(obj interface{}) { +func (c *IPPoolController) runNSWorker() { + for c.processNSItem() { + } +} + +func (c *IPPoolController) processNS(name string) error { + ns, err := c.nsInformer.Lister().Get(name) + if apierrors.IsNotFound(err) { + return nil + } + + var poolsName []string + if ns.Labels != nil && ns.Labels[constants.WorkspaceLabelKey] != "" { + pools, err := c.ippoolInformer.Lister().List(labels.SelectorFromSet(labels.Set{ + networkv1alpha1.IPPoolDefaultLabel: "", + })) + if err != nil { + return err + } + + for _, pool := range pools { + poolsName = append(poolsName, pool.Name) + } + } + + clone := ns.DeepCopy() + err = c.provider.UpdateNamespace(clone, poolsName) + if err != nil { + return err + } + if reflect.DeepEqual(clone, ns) { + return nil + } + + _, err = c.client.CoreV1().Namespaces().Update(context.TODO(), clone, metav1.UpdateOptions{}) + return err +} + +func (c *IPPoolController) processNSItem() bool { + key, quit := c.nsQueue.Get() + if quit { + return false + } + defer c.nsQueue.Done(key) + + err := c.processNS(key.(string)) + if err == nil { + c.nsQueue.Forget(key) + return true + } + + c.nsQueue.AddRateLimited(key) + utilruntime.HandleError(fmt.Errorf("error processing ns %v (will retry): %v", key, err)) + return true +} + +func (c *IPPoolController) enqueueIPAMBlocks(obj interface{}) { block, ok := obj.(*networkv1alpha1.IPAMBlock) if !ok { return @@ -360,9 +447,47 @@ func (c *IPPoolController) ipamblockHandle(obj interface{}) { c.ippoolQueue.Add(poolName) } +func (c *IPPoolController) enqueueWorkspace(obj interface{}) { + wk, ok := obj.(*tenantv1alpha1.Workspace) + if !ok { + return + } + + pools, err := c.ippoolInformer.Lister().List(labels.SelectorFromSet(labels.Set{ + constants.WorkspaceLabelKey: wk.Name, + })) + if err != nil { + klog.Errorf("failed to list ippools by worksapce %s, err=%v", wk.Name, err) + } + + for _, pool := range pools { + c.ippoolQueue.Add(pool.Name) + } +} + +func (c *IPPoolController) enqueueNamespace(old interface{}, new interface{}) { + workspaceOld := "" + if old != nil { + nsOld := old.(*corev1.Namespace) + if nsOld.Labels != nil { + workspaceOld = nsOld.Labels[constants.WorkspaceLabelKey] + } + } + + nsNew := new.(*corev1.Namespace) + workspaceNew := "" + if nsNew.Labels != nil { + workspaceNew = nsNew.Labels[constants.WorkspaceLabelKey] + } + + if workspaceOld != workspaceNew { + c.nsQueue.Add(nsNew.Name) + } +} + func NewIPPoolController( - ippoolInformer networkInformer.IPPoolInformer, - ipamblockInformer networkInformer.IPAMBlockInformer, + kubesphereInformers ksinformers.SharedInformerFactory, + kubernetesInformers k8sinformers.SharedInformerFactory, client clientset.Interface, kubesphereClient kubesphereclient.Interface, provider ippool.Provider) *IPPoolController { @@ -371,43 +496,71 @@ func NewIPPoolController( broadcaster.StartLogging(func(format string, args ...interface{}) { klog.Info(fmt.Sprintf(format, args)) }) - broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")}) - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ippool-controller"}) + broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: client.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "ippool-controller"}) c := &IPPoolController{ - eventBroadcaster: broadcaster, - eventRecorder: recorder, - ippoolInformer: ippoolInformer, - ippoolSynced: ippoolInformer.Informer().HasSynced, - ippoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool"), - ipamblockInformer: ipamblockInformer, - ipamblockSynced: ipamblockInformer.Informer().HasSynced, - client: client, - kubesphereClient: kubesphereClient, - provider: provider, - } - - ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.ippoolHandle, + eventBroadcaster: broadcaster, + eventRecorder: recorder, + ippoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool"), + nsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool-ns"), + client: client, + kubesphereClient: kubesphereClient, + provider: provider, + } + c.ippoolInformer = kubesphereInformers.Network().V1alpha1().IPPools() + c.ippoolSynced = c.ippoolInformer.Informer().HasSynced + c.ipamblockInformer = kubesphereInformers.Network().V1alpha1().IPAMBlocks() + c.ipamblockSynced = c.ipamblockInformer.Informer().HasSynced + c.wsInformer = kubesphereInformers.Tenant().V1alpha1().Workspaces() + c.wsSynced = c.wsInformer.Informer().HasSynced + c.nsInformer = kubernetesInformers.Core().V1().Namespaces() + c.nsSynced = c.nsInformer.Informer().HasSynced + + c.ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueIPPools, UpdateFunc: func(old, new interface{}) { - c.ippoolHandle(new) + _, defaultOld := old.(*networkv1alpha1.IPPool).Labels[networkv1alpha1.IPPoolDefaultLabel] + _, defaultNew := new.(*networkv1alpha1.IPPool).Labels[networkv1alpha1.IPPoolDefaultLabel] + if defaultOld != defaultNew { + nss, err := c.nsInformer.Lister().List(labels.Everything()) + if err != nil { + return + } + + for _, ns := range nss { + c.enqueueNamespace(nil, ns) + } + } + c.enqueueIPPools(new) }, }) //just for update ippool status - ipamblockInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.ipamblockHandle, + c.ipamblockInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueIPAMBlocks, UpdateFunc: func(old, new interface{}) { - c.ipamblockHandle(new) + c.enqueueIPAMBlocks(new) + }, + DeleteFunc: c.enqueueIPAMBlocks, + }) + + c.wsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: c.enqueueWorkspace, + }) + + c.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(new interface{}) { + c.enqueueNamespace(nil, new) }, - DeleteFunc: c.ipamblockHandle, + UpdateFunc: c.enqueueNamespace, }) //register ippool webhook webhooks.RegisterValidator(networkv1alpha1.SchemeGroupVersion.WithKind(networkv1alpha1.ResourceKindIPPool).String(), &webhooks.ValidatorWrap{Obj: &networkv1alpha1.IPPool{}, Helper: c}) - webhooks.RegisterDefaulter(podv1.SchemeGroupVersion.WithKind("Pod").String(), - &webhooks.DefaulterWrap{Obj: &podv1.Pod{}, Helper: provider}) + webhooks.RegisterDefaulter(corev1.SchemeGroupVersion.WithKind("Pod").String(), + &webhooks.DefaulterWrap{Obj: &corev1.Pod{}, Helper: provider}) return c } diff --git a/pkg/controller/network/ippool/ippool_controller_test.go b/pkg/controller/network/ippool/ippool_controller_test.go index c3dc4430..6740af47 100644 --- a/pkg/controller/network/ippool/ippool_controller_test.go +++ b/pkg/controller/network/ippool/ippool_controller_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sinformers "k8s.io/client-go/informers" k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1" @@ -45,6 +46,10 @@ func TestIPPoolSuit(t *testing.T) { RunSpecs(t, "IPPool Suite") } +var ( + alwaysReady = func() bool { return true } +) + var _ = Describe("test ippool", func() { pool := &v1alpha1.IPPool{ TypeMeta: v1.TypeMeta{}, @@ -60,16 +65,16 @@ var _ = Describe("test ippool", func() { ksclient := ksfake.NewSimpleClientset() k8sclinet := k8sfake.NewSimpleClientset() - p := ippool.NewProvider(nil, ksclient, k8sclinet, v1alpha1.IPPoolTypeLocal, nil) - ipamClient := ipam.NewIPAMClient(ksclient, v1alpha1.VLAN) - ksInformer := ksinformers.NewSharedInformerFactory(ksclient, 0) - ippoolInformer := ksInformer.Network().V1alpha1().IPPools() - ipamblockInformer := ksInformer.Network().V1alpha1().IPAMBlocks() - c := NewIPPoolController(ippoolInformer, ipamblockInformer, k8sclinet, ksclient, p) + k8sInformer := k8sinformers.NewSharedInformerFactory(k8sclinet, 0) + + p := ippool.NewProvider(k8sInformer, ksclient, k8sclinet, v1alpha1.IPPoolTypeLocal, nil) + ipamClient := ipam.NewIPAMClient(ksclient, v1alpha1.VLAN) + c := NewIPPoolController(ksInformer, k8sInformer, k8sclinet, ksclient, p) stopCh := make(chan struct{}) go ksInformer.Start(stopCh) + go k8sInformer.Start(stopCh) go c.Start(stopCh) It("test create ippool", func() { diff --git a/pkg/simple/client/network/ippool/calico/provider.go b/pkg/simple/client/network/ippool/calico/provider.go index b4d62220..e3081da7 100644 --- a/pkg/simple/client/network/ippool/calico/provider.go +++ b/pkg/simple/client/network/ippool/calico/provider.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + k8sinformers "k8s.io/client-go/informers" "net" "time" @@ -38,7 +39,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apis/network/calicov3" @@ -94,7 +94,7 @@ type provider struct { options Options } -func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error { +func (p *provider) CreateIPPool(pool *v1alpha1.IPPool) error { calicoPool := &calicov3.IPPool{ TypeMeta: v1.TypeMeta{}, ObjectMeta: v1.ObjectMeta{ @@ -104,9 +104,9 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error { CIDR: pool.Spec.CIDR, Disabled: pool.Spec.Disabled, NodeSelector: "all()", - VXLANMode: v3.VXLANMode(c.options.VXLANMode), - IPIPMode: v3.IPIPMode(c.options.IPIPMode), - NATOutgoing: c.options.NATOutgoing, + VXLANMode: v3.VXLANMode(p.options.VXLANMode), + IPIPMode: v3.IPIPMode(p.options.IPIPMode), + NATOutgoing: p.options.NATOutgoing, }, } @@ -121,7 +121,7 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error { klog.Warningf("cannot set reference for calico ippool %s, err=%v", pool.Name, err) } - _, err = c.client.CrdCalicov3().IPPools().Create(context.TODO(), calicoPool, v1.CreateOptions{}) + _, err = p.client.CrdCalicov3().IPPools().Create(context.TODO(), calicoPool, v1.CreateOptions{}) if k8serrors.IsAlreadyExists(err) { return nil } @@ -129,19 +129,22 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error { return err } -func (c provider) UpdateIPPool(pool *v1alpha1.IPPool) error { +func (p *provider) UpdateIPPool(pool *v1alpha1.IPPool) error { return nil } -func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error) { +func (p *provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error) { stats := pool.DeepCopy() - calicoPool, err := c.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{}) + calicoPool, err := p.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{}) if err != nil { return nil, err } - blocks, err := c.listBlocks(calicoPool) + blocks, err := p.block.Lister().List(labels.SelectorFromSet( + labels.Set{ + v1alpha1.IPPoolNameLabel: calicoPool.Name, + })) if err != nil { return nil, err } @@ -152,9 +155,7 @@ func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error stats.Status.Synced = true stats.Status.Allocations = 0 stats.Status.Reserved = 0 - if stats.Status.Workspaces == nil { - stats.Status.Workspaces = make(map[string]v1alpha1.WorkspaceStatus) - } + stats.Status.Workspaces = make(map[string]v1alpha1.WorkspaceStatus) if len(blocks) <= 0 { stats.Status.Unallocated = pool.NumAddresses() @@ -168,23 +169,20 @@ func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error stats.Status.Unallocated = stats.Status.Capacity - stats.Status.Allocations - stats.Status.Reserved } - wks, err := c.getAssociatedWorkspaces(pool) + wks, err := p.getAssociatedWorkspaces(pool) if err != nil { return nil, err } for _, wk := range wks { - status, err := c.getWorkspaceStatus(wk, pool.GetName()) + status, err := p.getWorkspaceStatus(wk, pool.GetName()) if err != nil { return nil, err } - stats.Status.Workspaces[wk] = *status - } - - for name, wk := range stats.Status.Workspaces { - if wk.Allocations == 0 { - delete(stats.Status.Workspaces, name) + if status.Allocations == 0 { + continue } + stats.Status.Workspaces[wk] = *status } return stats, nil @@ -195,16 +193,18 @@ func setBlockAffiDeletion(c calicoset.Interface, blockAffi *calicov3.BlockAffini return nil } - blockAffi.Spec.State = string(model.StatePendingDeletion) - _, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), blockAffi, v1.UpdateOptions{}) + clone := blockAffi.DeepCopy() + clone.Spec.State = string(model.StatePendingDeletion) + _, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), clone, v1.UpdateOptions{}) return err } func deleteBlockAffi(c calicoset.Interface, blockAffi *calicov3.BlockAffinity) error { trueStr := fmt.Sprintf("%t", true) if blockAffi.Spec.Deleted != trueStr { - blockAffi.Spec.Deleted = trueStr - _, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), blockAffi, v1.UpdateOptions{}) + clone := blockAffi.DeepCopy() + clone.Spec.Deleted = trueStr + _, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), clone, v1.UpdateOptions{}) if err != nil { return err } @@ -218,10 +218,10 @@ func deleteBlockAffi(c calicoset.Interface, blockAffi *calicov3.BlockAffinity) e return nil } -func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.BlockAffinity) error) error { +func (p *provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.BlockAffinity) error) error { _, cidrNet, _ := cnet.ParseCIDR(pool.Spec.CIDR) - blockAffis, err := c.client.CrdCalicov3().BlockAffinities().List(context.TODO(), v1.ListOptions{}) + blockAffis, err := p.client.CrdCalicov3().BlockAffinities().List(context.TODO(), v1.ListOptions{}) if err != nil { return err } @@ -232,7 +232,7 @@ func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interfac continue } - err = do(c.client, &blockAffi) + err = do(p.client, &blockAffi) if err != nil { return err } @@ -241,34 +241,17 @@ func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interfac return nil } -func (c provider) listBlocks(pool *calicov3.IPPool) ([]calicov3.IPAMBlock, error) { - _, cidrNet, _ := cnet.ParseCIDR(pool.Spec.CIDR) - - blocks, err := c.client.CrdCalicov3().IPAMBlocks().List(context.TODO(), v1.ListOptions{}) - if err != nil { - return nil, err - } - - var result []calicov3.IPAMBlock - for _, block := range blocks.Items { - _, blockCIDR, _ := cnet.ParseCIDR(block.Spec.CIDR) - if !cidrNet.IsNetOverlap(blockCIDR.IPNet) { - continue - } - result = append(result, block) - } - - return result, nil -} - -func (c provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.IPAMBlock) error) error { - blocks, err := c.listBlocks(pool) +func (p *provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.IPAMBlock) error) error { + blocks, err := p.block.Lister().List(labels.SelectorFromSet( + labels.Set{ + v1alpha1.IPPoolNameLabel: pool.Name, + })) if err != nil { return err } for _, block := range blocks { - err = do(c.client, &block) + err = do(p.client, block) if err != nil { return err } @@ -280,8 +263,9 @@ func (c provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, * func deleteBlock(c calicoset.Interface, block *calicov3.IPAMBlock) error { if block.Empty() { if !block.Spec.Deleted { - block.Spec.Deleted = true - _, err := c.CrdCalicov3().IPAMBlocks().Update(context.TODO(), block, v1.UpdateOptions{}) + clone := block.DeepCopy() + clone.Spec.Deleted = true + _, err := c.CrdCalicov3().IPAMBlocks().Update(context.TODO(), clone, v1.UpdateOptions{}) if err != nil { return err } @@ -297,7 +281,7 @@ func deleteBlock(c calicoset.Interface, block *calicov3.IPAMBlock) error { return nil } -func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { +func (p *provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { // Deleting a pool requires a little care because of existing endpoints // using IP addresses allocated in the pool. We do the deletion in // the following steps: @@ -306,7 +290,7 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { // - delete the pool // Get the pool so that we can find the CIDR associated with it. - calicoPool, err := c.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{}) + calicoPool, err := p.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return true, nil @@ -318,14 +302,14 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { if !calicoPool.Spec.Disabled { calicoPool.Spec.Disabled = true - calicoPool, err = c.client.CrdCalicov3().IPPools().Update(context.TODO(), calicoPool, v1.UpdateOptions{}) + calicoPool, err = p.client.CrdCalicov3().IPPools().Update(context.TODO(), calicoPool, v1.UpdateOptions{}) if err != nil { return false, err } } //If the address pool is being used, we return, avoiding deletions that cause other problems. - stat, err := c.GetIPPoolStats(pool) + stat, err := p.GetIPPoolStats(pool) if err != nil { return false, err } @@ -334,13 +318,13 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { } //set blockaffi to pendingdelete - err = c.doBlockAffis(calicoPool, setBlockAffiDeletion) + err = p.doBlockAffis(calicoPool, setBlockAffiDeletion) if err != nil { return false, err } //delete block - err = c.doBlocks(calicoPool, deleteBlock) + err = p.doBlocks(calicoPool, deleteBlock) if err != nil { if errors.Is(err, ErrBlockInuse) { return false, nil @@ -349,13 +333,13 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { } //delete blockaffi - err = c.doBlockAffis(calicoPool, deleteBlockAffi) + err = p.doBlockAffis(calicoPool, deleteBlockAffi) if err != nil { return false, err } //delete calico ippool - err = c.client.CrdCalicov3().IPPools().Delete(context.TODO(), calicoPool.Name, v1.DeleteOptions{}) + err = p.client.CrdCalicov3().IPPools().Delete(context.TODO(), calicoPool.Name, v1.DeleteOptions{}) if err != nil { return false, err } @@ -365,14 +349,14 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) { } //Synchronizing address pools at boot time -func (c provider) syncIPPools() error { - calicoPools, err := c.client.CrdCalicov3().IPPools().List(context.TODO(), v1.ListOptions{}) +func (p *provider) syncIPPools() error { + calicoPools, err := p.client.CrdCalicov3().IPPools().List(context.TODO(), v1.ListOptions{}) if err != nil { klog.V(4).Infof("syncIPPools: cannot list calico ippools, err=%v", err) return err } - pools, err := c.ksclient.NetworkV1alpha1().IPPools().List(context.TODO(), v1.ListOptions{}) + pools, err := p.ksclient.NetworkV1alpha1().IPPools().List(context.TODO(), v1.ListOptions{}) if err != nil { klog.V(4).Infof("syncIPPools: cannot list kubesphere ippools, err=%v", err) return err @@ -402,7 +386,7 @@ func (c provider) syncIPPools() error { Status: v1alpha1.IPPoolStatus{}, } - _, err = c.ksclient.NetworkV1alpha1().IPPools().Create(context.TODO(), pool, v1.CreateOptions{}) + _, err = p.ksclient.NetworkV1alpha1().IPPools().Create(context.TODO(), pool, v1.CreateOptions{}) if err != nil { klog.V(4).Infof("syncIPPools: cannot create kubesphere ippools, err=%v", err) return err @@ -413,7 +397,7 @@ func (c provider) syncIPPools() error { return nil } -func (p provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, error) { +func (p *provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, error) { var result []string poolLabel := constants.WorkspaceLabelKey @@ -430,10 +414,19 @@ func (p provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, erro return result, nil } - return append(result, pool.GetLabels()[poolLabel]), nil + wk := pool.GetLabels()[poolLabel] + _, err := p.ksclient.TenantV1alpha1().Workspaces().Get(context.TODO(), wk, v1.GetOptions{}) + if k8serrors.IsNotFound(err) { + clone := pool.DeepCopy() + delete(clone.GetLabels(), poolLabel) + _, err := p.ksclient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, v1.UpdateOptions{}) + return nil, err + } + + return append(result, wk), err } -func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.WorkspaceStatus, error) { +func (p *provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.WorkspaceStatus, error) { var result v1alpha1.WorkspaceStatus namespaces, err := p.k8sclient.CoreV1().Namespaces().List(context.TODO(), v1.ListOptions{ @@ -448,12 +441,19 @@ func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.Wo } for _, ns := range namespaces.Items { - pods, err := p.k8sclient.CoreV1().Pods(ns.GetName()).List(context.TODO(), v1.ListOptions{}) + pods, err := p.k8sclient.CoreV1().Pods(ns.GetName()).List(context.TODO(), v1.ListOptions{ + LabelSelector: labels.SelectorFromSet( + labels.Set{ + v1alpha1.IPPoolNameLabel: poolName, + }, + ).String(), + }) if err != nil { return nil, err } + for _, pod := range pods.Items { - if pod.GetLabels() != nil && pod.GetLabels()[v1alpha1.IPPoolNameLabel] == poolName { + if pod.Status.Phase != corev1.PodSucceeded { result.Allocations++ } } @@ -462,11 +462,25 @@ func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.Wo return &result, nil } -func (p provider) Type() string { +func (p *provider) Type() string { return v1alpha1.IPPoolTypeCalico } -func (p provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error { +func (p *provider) UpdateNamespace(ns *corev1.Namespace, pools []string) error { + if pools != nil { + annostrs, _ := json.Marshal(pools) + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations[CalicoAnnotationIPPoolV4] = string(annostrs) + } else { + delete(ns.Annotations, CalicoAnnotationIPPoolV4) + } + + return nil +} + +func (p *provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error { defer utilruntime.HandleCrash() defer p.queue.ShutDown() @@ -488,7 +502,7 @@ func (p provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInt return nil } -func (p provider) processBlock(name string) error { +func (p *provider) processBlock(name string) error { block, err := p.block.Lister().Get(name) if err != nil { if k8serrors.IsNotFound(err) { @@ -510,10 +524,11 @@ func (p provider) processBlock(name string) error { if poolCIDR.IsNetOverlap(blockCIDR.IPNet) { poolName = pool.Name - block.Labels = map[string]string{ + clone := block.DeepCopy() + clone.Labels = map[string]string{ v1alpha1.IPPoolNameLabel: pool.Name, } - p.client.CrdCalicov3().IPAMBlocks().Update(context.TODO(), block, v1.UpdateOptions{}) + p.client.CrdCalicov3().IPAMBlocks().Update(context.TODO(), clone, v1.UpdateOptions{}) break } } @@ -529,52 +544,35 @@ func (p provider) processBlock(name string) error { pod, err := p.pods.Lister().Pods(namespace).Get(name) if err != nil { - continue + if k8serrors.IsNotFound(err) { + continue + } + return err } - labels := pod.GetLabels() + clone := pod.DeepCopy() + labels := clone.GetLabels() if labels != nil { poolLabel := labels[v1alpha1.IPPoolNameLabel] if poolLabel != "" { continue } + } else { + clone.Labels = make(map[string]string) } - retry.RetryOnConflict(retry.DefaultBackoff, func() error { - pod, err = p.k8sclient.CoreV1().Pods(namespace).Get(context.TODO(), name, v1.GetOptions{}) - if err != nil { - return err - } - - labels := pod.GetLabels() - if labels != nil { - poolLabel := labels[v1alpha1.IPPoolNameLabel] - if poolLabel != "" { - return nil - } - } else { - pod.Labels = make(map[string]string) - } - - if pod.GetAnnotations() == nil { - pod.Annotations = make(map[string]string) - } - - annostrs, _ := json.Marshal([]string{poolName}) - pod.GetAnnotations()[CalicoAnnotationIPPoolV4] = string(annostrs) - pod.Labels[v1alpha1.IPPoolNameLabel] = poolName - - _, err = p.k8sclient.CoreV1().Pods(namespace).Update(context.TODO(), pod, v1.UpdateOptions{}) + clone.Labels[v1alpha1.IPPoolNameLabel] = poolName + _, err = p.k8sclient.CoreV1().Pods(namespace).Update(context.TODO(), clone, v1.UpdateOptions{}) + if err != nil { return err - }) + } } - p.poolQueue.Add(poolName) return nil } -func (p provider) processBlockItem() bool { +func (p *provider) processBlockItem() bool { key, quit := p.queue.Get() if quit { return false @@ -592,12 +590,12 @@ func (p provider) processBlockItem() bool { return true } -func (p provider) runWorker() { +func (p *provider) runWorker() { for p.processBlockItem() { } } -func (p provider) addBlock(obj interface{}) { +func (p *provider) addBlock(obj interface{}) { block, ok := obj.(*calicov3.IPAMBlock) if !ok { return @@ -606,7 +604,7 @@ func (p provider) addBlock(obj interface{}) { p.queue.Add(block.Name) } -func (p provider) Default(obj runtime.Object) error { +func (p *provider) Default(obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { return nil @@ -639,7 +637,18 @@ func (p provider) Default(obj runtime.Object) error { return nil } -func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclient.Interface, k8sClient clientset.Interface, k8sOptions *k8s.KubernetesOptions) provider { +func (p *provider) addPod(obj interface{}) { + pod, _ := obj.(*corev1.Pod) + + if pod.Labels != nil { + pool := pod.Labels[v1alpha1.IPPoolNameLabel] + if pool != "" && p.poolQueue != nil { + p.poolQueue.Add(pool) + } + } +} + +func NewProvider(k8sInformer k8sinformers.SharedInformerFactory, ksclient kubesphereclient.Interface, k8sClient clientset.Interface, k8sOptions *k8s.KubernetesOptions) *provider { config, err := clientcmd.BuildConfigFromFlags("", k8sOptions.KubeConfig) if err != nil { klog.Fatalf("failed to build k8s config , err=%v", err) @@ -677,14 +686,27 @@ func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclie } } - p := provider{ + p := &provider{ client: client, ksclient: ksclient, k8sclient: k8sClient, - pods: podInformer, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "calicoBlock"), options: opts, } + p.pods = k8sInformer.Core().V1().Pods() + + p.pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + poolOld := old.(*corev1.Pod).Labels[v1alpha1.IPPoolNameLabel] + poolNew := new.(*corev1.Pod).Labels[v1alpha1.IPPoolNameLabel] + if poolNew == poolOld { + return + } + p.addPod(new) + }, + DeleteFunc: p.addPod, + AddFunc: p.addPod, + }) blockI := calicoInformer.NewSharedInformerFactory(client, defaultResync).Crd().Calicov3().IPAMBlocks() blockI.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/simple/client/network/ippool/ipam/ipam.go b/pkg/simple/client/network/ippool/ipam/ipam.go index a4df002b..646d03e5 100644 --- a/pkg/simple/client/network/ippool/ipam/ipam.go +++ b/pkg/simple/client/network/ippool/ipam/ipam.go @@ -423,6 +423,10 @@ func (c IPAMClient) GetUtilization(args GetUtilizationArgs) ([]*PoolUtilization, return nil, err } + if len(allPools) <= 0 { + return nil, fmt.Errorf("not found pool") + } + // Identify the ones we want and create a PoolUtilization for each of those. wantAllPools := len(args.Pools) == 0 wantedPools := set.FromArray(args.Pools) diff --git a/pkg/simple/client/network/ippool/provider.go b/pkg/simple/client/network/ippool/provider.go index 20e60b5f..43eae33f 100644 --- a/pkg/simple/client/network/ippool/provider.go +++ b/pkg/simple/client/network/ippool/provider.go @@ -17,8 +17,9 @@ limitations under the License. package ippool import ( + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - v1 "k8s.io/client-go/informers/core/v1" + k8sinformers "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/workqueue" networkv1alpha1 "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1" @@ -35,6 +36,7 @@ type Provider interface { UpdateIPPool(pool *networkv1alpha1.IPPool) error GetIPPoolStats(pool *networkv1alpha1.IPPool) (*networkv1alpha1.IPPool, error) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error + UpdateNamespace(ns *corev1.Namespace, pools []string) error Type() string Default(obj runtime.Object) error } @@ -52,6 +54,10 @@ func (p provider) Default(obj runtime.Object) error { return nil } +func (p provider) UpdateNamespace(ns *corev1.Namespace, pools []string) error { + return nil +} + func (p provider) DeleteIPPool(pool *networkv1alpha1.IPPool) (bool, error) { blocks, err := p.ipamclient.ListBlocks(pool.Name) if err != nil { @@ -110,7 +116,7 @@ func newProvider(clientset kubesphereclient.Interface) provider { } } -func NewProvider(podInformer v1.PodInformer, clientset kubesphereclient.Interface, client clientset.Interface, pt string, k8sOptions *k8s.KubernetesOptions) Provider { +func NewProvider(k8sInformer k8sinformers.SharedInformerFactory, clientset kubesphereclient.Interface, client clientset.Interface, pt string, k8sOptions *k8s.KubernetesOptions) Provider { var p Provider switch pt { @@ -120,7 +126,7 @@ func NewProvider(podInformer v1.PodInformer, clientset kubesphereclient.Interfac ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.VLAN), } case networkv1alpha1.IPPoolTypeCalico: - p = calicoclient.NewProvider(podInformer, clientset, client, k8sOptions) + p = calicoclient.NewProvider(k8sInformer, clientset, client, k8sOptions) } return p -- GitLab