未验证 提交 ab7ecee9 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #3138 from RolandMa1986/controller-refactoring

refactor controllers
......@@ -18,7 +18,6 @@ package group
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
......@@ -26,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
......@@ -38,6 +36,7 @@ import (
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
iamv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/utils/controller"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
)
......@@ -49,13 +48,12 @@ const (
)
type Controller struct {
controller.BaseController
scheme *runtime.Scheme
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
groupInformer iamv1alpha2informers.GroupInformer
groupLister iamv1alpha1listers.GroupLister
groupSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
......@@ -66,95 +64,33 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
ctl := &Controller{
BaseController: controller.BaseController{
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"),
Synced: []cache.InformerSynced{groupInformer.Informer().HasSynced},
Name: controllerName,
},
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}),
k8sClient: k8sClient,
ksClient: ksClient,
groupInformer: groupInformer,
groupLister: groupInformer.Lister(),
groupSynced: groupInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"),
recorder: recorder,
}
ctl.Handler = ctl.reconcile
klog.Info("Setting up event handlers")
groupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.enqueueGroup,
AddFunc: ctl.Enqueue,
UpdateFunc: func(old, new interface{}) {
ctl.enqueueGroup(new)
ctl.Enqueue(new)
},
DeleteFunc: ctl.enqueueGroup,
DeleteFunc: ctl.Enqueue,
})
return ctl
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("Starting Group controller")
klog.Info("Waiting for informer caches to sync")
synced := []cache.InformerSynced{c.groupSynced}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
func (c *Controller) enqueueGroup(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.reconcile(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.Infof("Successfully synced %s:%s", "key", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
// reconcile handles Group informer events, clear up related reource when group is being deleted.
......@@ -207,10 +143,6 @@ func (c *Controller) reconcile(key string) error {
return nil
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *Controller) deleteGroupBindings(group *iam1alpha2.Group) error {
// Groupbindings that created by kubeshpere will be deleted directly.
......
......@@ -90,7 +90,6 @@ func (f *fixture) newController() (*Controller, ksinformers.SharedInformerFactor
c := NewController(f.k8sclient, f.ksclient,
ksinformers.Iam().V1alpha2().Groups())
c.groupSynced = alwaysReady
c.recorder = &record.FakeRecorder{}
return c, ksinformers, k8sinformers
......@@ -113,7 +112,7 @@ func (f *fixture) runController(group string, startInformers bool, expectError b
k8sI.Start(stopCh)
}
err := c.reconcile(group)
err := c.Handler(group)
if !expectError && err != nil {
f.t.Errorf("error syncing group: %v", err)
} else if expectError && err == nil {
......
......@@ -18,14 +18,12 @@ package groupbinding
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
......@@ -37,6 +35,7 @@ import (
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/utils/controller"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
"sigs.k8s.io/controller-runtime/pkg/client"
)
......@@ -49,13 +48,12 @@ const (
)
type Controller struct {
controller.BaseController
scheme *runtime.Scheme
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
groupBindingInformer iamv1alpha2informers.GroupBindingInformer
groupBindingLister iamv1alpha2listers.GroupBindingLister
groupBindingSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
......@@ -67,97 +65,29 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
ctl := &Controller{
BaseController: controller.BaseController{
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GroupBinding"),
Synced: []cache.InformerSynced{groupBindingInformer.Informer().HasSynced},
Name: controllerName,
},
k8sClient: k8sClient,
ksClient: ksClient,
groupBindingInformer: groupBindingInformer,
groupBindingLister: groupBindingInformer.Lister(),
groupBindingSynced: groupBindingInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GroupBinding"),
recorder: recorder,
}
ctl.Handler = ctl.reconcile
klog.Info("Setting up event handlers")
groupBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.enqueueGroupBinding,
AddFunc: ctl.Enqueue,
UpdateFunc: func(old, new interface{}) {
ctl.enqueueGroupBinding(new)
ctl.Enqueue(new)
},
DeleteFunc: ctl.enqueueGroupBinding,
DeleteFunc: ctl.Enqueue,
})
return ctl
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("Starting GroupBinding controller")
klog.Info("Waiting for informer caches to sync")
synced := []cache.InformerSynced{c.groupBindingSynced}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
func (c *Controller) enqueueGroupBinding(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.reconcile(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.Infof("Successfully synced %s:%s", "key", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// reconcile handles GroupBinding informer events, it updates user's Groups property with the current GroupBinding.
func (c *Controller) reconcile(key string) error {
......
......@@ -117,7 +117,7 @@ func (f *fixture) newController() (*Controller, ksinformers.SharedInformerFactor
c := NewController(f.k8sclient, f.ksclient,
ksinformers.Iam().V1alpha2().GroupBindings())
c.groupBindingSynced = alwaysReady
c.Synced = []cache.InformerSynced{alwaysReady}
c.recorder = &record.FakeRecorder{}
return c, ksinformers, k8sinformers
......
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
// BaseController provides a Controller template for watching a primary resources that defined as CRD.
type BaseController struct {
// Workers will wait informer caches to be synced
Synced []cache.InformerSynced
// Workqueue is a rate limited work queue.
Workqueue workqueue.RateLimitingInterface
Handler func(key string) error
MaxRetries int
Name string
}
// Run will set up the event handlers for Primary resource, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *BaseController) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.Workqueue.ShutDown()
klog.Infof("Starting controller: %s", c.Name)
klog.Infof("Waiting for informer caches to sync for: %s", c.Name)
if ok := cache.WaitForCacheSync(stopCh, c.Synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync for: %s", c.Name)
}
klog.Infof("Starting workers for: %s", c.Name)
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Infof("Started workers for: %s", c.Name)
<-stopCh
klog.Infof("Shutting down workers for: %s", c.Name)
return nil
}
// Enqueue takes a primary resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than primary resource.
func (c *BaseController) Enqueue(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.Workqueue.Add(key)
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *BaseController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the Handler.
func (c *BaseController) processNextWorkItem() bool {
obj, shutdown := c.Workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.Workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.Workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in Workqueue but got %#v in %s", obj, c.Name))
return nil
}
if err := c.Handler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors,
// when the max retries haven't reached or there is no retry times limit.
if c.MaxRetries == 0 || c.Workqueue.NumRequeues(key) < c.MaxRetries {
c.Workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s' in %s: %s, requeuing ", key, c.Name, err.Error())
}
klog.V(4).Infof("Dropping %s out of the queue in %s: %s", key, c.Name, err)
utilruntime.HandleError(err)
return nil
}
c.Workqueue.Forget(obj)
klog.Infof("Successfully Synced %s:%s in %s", "key", key, c.Name)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
controllerName = "base-controler-test"
)
type fixture struct {
t *testing.T
stopCh chan struct{}
BaseController
handleTimes int
}
type fakeObj struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}
func (in *fakeObj) DeepCopyInto(out *fakeObj) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalRole.
func (in *fakeObj) DeepCopy() *fakeObj {
if in == nil {
return nil
}
out := new(fakeObj)
in.DeepCopyInto(out)
return out
}
func (in *fakeObj) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
func newFixture(t *testing.T, retryTimes int) *fixture {
f := &fixture{}
f.t = t
f.stopCh = make(chan struct{})
f.BaseController = BaseController{
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"),
Synced: []cache.InformerSynced{alwaysReady},
Name: controllerName,
}
f.MaxRetries = retryTimes
return f
}
func (f *fixture) reconcile(key string) error {
f.handleTimes++
f.t.Logf("Current key is %s", key)
f.stopCh <- struct{}{}
return nil
}
func (f *fixture) retryreconcile(key string) error {
f.handleTimes++
f.t.Logf("Current key is %s", key)
if f.Workqueue.NumRequeues(key) == 2 {
defer func(f *fixture) { f.stopCh <- struct{}{} }(f)
}
err := fmt.Errorf("retry times: %d", f.Workqueue.NumRequeues(key))
return err
}
func createFakeobj() metav1.Object {
var obj metav1.Object
fake := fakeObj{
ObjectMeta: metav1.ObjectMeta{Name: "Hello"},
TypeMeta: metav1.TypeMeta{},
}
obj = &fake
return obj
}
func TestDequeue(t *testing.T) {
f := newFixture(t, 0)
f.Handler = f.reconcile
go f.Run(1, f.stopCh)
obj := createFakeobj()
f.Enqueue(obj)
<-f.stopCh
if f.handleTimes != 1 {
t.Error("Failed to call the handler!")
}
}
func TestRetry(t *testing.T) {
f := newFixture(t, 2)
f.Handler = f.retryreconcile
go f.Run(1, f.stopCh)
obj := createFakeobj()
f.Enqueue(obj)
<-f.stopCh
if f.handleTimes != f.MaxRetries+1 {
t.Error("Failed to call the handler!")
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册