devopscredential_controller.go 8.2 KB
Newer Older
R
runzexia 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
package devopscredential

import (
	"fmt"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	corev1informer "k8s.io/client-go/informers/core/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/scheme"
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
	corev1lister "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
	devopsv1alpha3 "kubesphere.io/kubesphere/pkg/apis/devops/v1alpha3"
	kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
	"kubesphere.io/kubesphere/pkg/constants"
	devopsClient "kubesphere.io/kubesphere/pkg/simple/client/devops"
	"kubesphere.io/kubesphere/pkg/utils/k8sutil"
	"kubesphere.io/kubesphere/pkg/utils/sliceutil"
	"reflect"
	"strings"
	"time"
)

/**
  DevOps project controller is used to maintain the state of the DevOps project.
*/

type Controller struct {
	client           clientset.Interface
	kubesphereClient kubesphereclient.Interface

	eventBroadcaster record.EventBroadcaster
	eventRecorder    record.EventRecorder

	secretLister corev1lister.SecretLister
	secretSynced cache.InformerSynced

	namespaceLister corev1lister.NamespaceLister
	namespaceSynced cache.InformerSynced

	workqueue workqueue.RateLimitingInterface

	workerLoopPeriod time.Duration

	devopsClient devopsClient.Interface
}

func NewController(client clientset.Interface,
	devopsClinet devopsClient.Interface,
	namespaceInformer corev1informer.NamespaceInformer,
	secretInformer corev1informer.SecretInformer) *Controller {

	broadcaster := record.NewBroadcaster()
	broadcaster.StartLogging(func(format string, args ...interface{}) {
		klog.Info(fmt.Sprintf(format, args))
	})
	broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
R
runzexia 已提交
63
	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "devopscredential-controller"})
R
runzexia 已提交
64 65 66 67

	v := &Controller{
		client:           client,
		devopsClient:     devopsClinet,
R
runzexia 已提交
68
		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "devopscredential"),
R
runzexia 已提交
69 70 71 72 73 74 75 76 77 78 79 80
		secretLister:     secretInformer.Lister(),
		secretSynced:     secretInformer.Informer().HasSynced,
		namespaceLister:  namespaceInformer.Lister(),
		namespaceSynced:  namespaceInformer.Informer().HasSynced,
		workerLoopPeriod: time.Second,
	}

	v.eventBroadcaster = broadcaster
	v.eventRecorder = recorder

	secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
R
runzexia 已提交
81 82
			secret, ok := obj.(*v1.Secret)
			if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
R
runzexia 已提交
83 84 85 86
				v.enqueueSecret(obj)
			}
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
R
runzexia 已提交
87 88 89
			old, ook := oldObj.(*v1.Secret)
			new, nok := newObj.(*v1.Secret)
			if ook && nok && old.ResourceVersion == new.ResourceVersion {
R
runzexia 已提交
90 91
				return
			}
R
runzexia 已提交
92
			if ook && nok && strings.HasPrefix(string(new.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
R
runzexia 已提交
93 94 95 96
				v.enqueueSecret(newObj)
			}
		},
		DeleteFunc: func(obj interface{}) {
R
runzexia 已提交
97 98
			secret, ok := obj.(*v1.Secret)
			if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
R
runzexia 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
				v.enqueueSecret(obj)
			}
		},
	})
	return v
}

// enqueueSecret takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than DevOpsProject.
func (c *Controller) enqueueSecret(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		utilruntime.HandleError(err)
		return
	}
	c.workqueue.Add(key)
}

func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	err := func(obj interface{}) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool

		if key, ok = obj.(string); !ok {
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		if err := c.syncHandler(key); err != nil {
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
		}
		c.workqueue.Forget(obj)
		klog.V(5).Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		klog.Error(err, "could not reconcile devopsProject")
		utilruntime.HandleError(err)
		return true
	}

	return true
}

func (c *Controller) worker() {

	for c.processNextWorkItem() {
	}
}

func (c *Controller) Start(stopCh <-chan struct{}) error {
	return c.Run(1, stopCh)
}

func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
	defer utilruntime.HandleCrash()
	defer c.workqueue.ShutDown()

R
runzexia 已提交
168 169
	klog.Info("starting devopscredential controller")
	defer klog.Info("shutting down  devopscredential controller")
R
runzexia 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183

	if !cache.WaitForCacheSync(stopCh, c.secretSynced) {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	for i := 0; i < workers; i++ {
		go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
	}

	<-stopCh
	return nil
}

// syncHandler compares the actual state with the desired, and attempts to
R
runzexia 已提交
184
// converge the two. It then updates the Status block of the secret resource
R
runzexia 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	nsName, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		klog.Error(err, fmt.Sprintf("could not split copySecret meta %s ", key))
		return nil
	}
	namespace, err := c.namespaceLister.Get(nsName)
	if err != nil {
		if errors.IsNotFound(err) {
			klog.Info(fmt.Sprintf("namespace '%s' in work queue no longer exists ", key))
			return nil
		}
		klog.Error(err, fmt.Sprintf("could not get namespace %s ", key))
		return err
	}
	if !isDevOpsProjectAdminNamespace(namespace) {
		err := fmt.Errorf("cound not create credential in normal namespaces %s", namespace.Name)
		klog.Warning(err)
		return err
	}

	secret, err := c.secretLister.Secrets(nsName).Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			klog.Info(fmt.Sprintf("secret '%s' in work queue no longer exists ", key))
			return nil
		}
		klog.Error(err, fmt.Sprintf("could not get secret %s ", key))
		return err
	}

	copySecret := secret.DeepCopy()
	// DeletionTimestamp.IsZero() means copySecret has not been deleted.
S
shaowenchen 已提交
219
	if secret.ObjectMeta.DeletionTimestamp.IsZero() {
R
runzexia 已提交
220
		// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#finalizers
S
shaowenchen 已提交
221
		if !sliceutil.HasString(secret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName) {
R
runzexia 已提交
222 223 224 225
			copySecret.ObjectMeta.Finalizers = append(copySecret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName)
		}
		// Check secret config exists, otherwise we will create it.
		// if secret exists, update config
S
shaowenchen 已提交
226 227
		_, err := c.devopsClient.CreateCredentialInProject(nsName, copySecret)
		if err != nil {
R
runzexia 已提交
228 229 230
			if _, ok := copySecret.Annotations[devopsv1alpha3.CredentialAutoSyncAnnoKey]; ok {
				_, err := c.devopsClient.UpdateCredentialInProject(nsName, copySecret)
				if err != nil {
S
shaowenchen 已提交
231
					klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
R
runzexia 已提交
232 233 234 235 236 237 238 239
					return err
				}
			}
		}

	} else {
		// Finalizers processing logic
		if sliceutil.HasString(copySecret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName) {
S
shaowenchen 已提交
240
			if _, err := c.devopsClient.DeleteCredentialInProject(nsName, secret.Name); err != nil {
S
shaowenchen 已提交
241
				klog.V(8).Info(err, fmt.Sprintf("failed to delete secret %s in devops", key))
R
runzexia 已提交
242 243 244 245 246 247 248 249 250 251 252
				return err
			}
			copySecret.ObjectMeta.Finalizers = sliceutil.RemoveString(copySecret.ObjectMeta.Finalizers, func(item string) bool {
				return item == devopsv1alpha3.CredentialFinalizerName
			})

		}
	}
	if !reflect.DeepEqual(secret, copySecret) {
		_, err = c.client.CoreV1().Secrets(nsName).Update(copySecret)
		if err != nil {
S
shaowenchen 已提交
253
			klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
R
runzexia 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267
			return err
		}
	}

	return nil
}

func isDevOpsProjectAdminNamespace(namespace *v1.Namespace) bool {
	_, ok := namespace.Labels[constants.DevOpsProjectLabelKey]

	return ok && k8sutil.IsControlledBy(namespace.OwnerReferences,
		devopsv1alpha3.ResourceKindDevOpsProject, "")

}