diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index d9db11ad2b9330d77c487734d566f55fbc33032a..e1f1d8ea807fd6df8a02b975bca7b915e0e52832 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -17,22 +17,15 @@ limitations under the License. package app import ( - "context" "fmt" "github.com/spf13/cobra" - "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog" "k8s.io/klog/klogr" "kubesphere.io/kubesphere/cmd/controller-manager/app/options" "kubesphere.io/kubesphere/pkg/apis" controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config" - "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme" "kubesphere.io/kubesphere/pkg/controller/namespace" "kubesphere.io/kubesphere/pkg/controller/network/nsnetworkpolicy" "kubesphere.io/kubesphere/pkg/controller/user" @@ -46,11 +39,10 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/s3" "kubesphere.io/kubesphere/pkg/utils/term" "os" + application "sigs.k8s.io/application/controllers" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" "sigs.k8s.io/controller-runtime/pkg/webhook" - - application "sigs.k8s.io/application/controllers" ) func NewControllerManagerCommand() *cobra.Command { @@ -72,6 +64,8 @@ func NewControllerManagerCommand() *cobra.Command { LeaderElect: s.LeaderElect, WebhookCertDir: s.WebhookCertDir, } + } else { + klog.Fatal("Failed to load configuration from disk", err) } cmd := &cobra.Command{ @@ -83,7 +77,7 @@ func NewControllerManagerCommand() *cobra.Command { os.Exit(1) } - if err = Run(s, signals.SetupSignalHandler()); err != nil { + if err = run(s, signals.SetupSignalHandler()); err != nil { klog.Error(err) os.Exit(1) } @@ -101,13 +95,13 @@ func NewControllerManagerCommand() *cobra.Command { usageFmt := "Usage:\n %s\n" cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { - fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine()) + _, _ = fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine()) cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols) }) return cmd } -func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) error { +func run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) error { kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions) if err != nil { klog.Errorf("Failed to create kubernetes clientset %v", err) @@ -160,126 +154,89 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions()) - run := func(ctx context.Context) { - klog.V(0).Info("setting up manager") - // Use 8443 instead of 443 cause we need root permission to bind port 443 - mgr, err := manager.New(kubernetesClient.Config(), manager.Options{CertDir: s.WebhookCertDir, Port: 8443}) - if err != nil { - klog.Fatalf("unable to set up overall controller manager: %v", err) - } - - klog.V(0).Info("setting up scheme") - if err := apis.AddToScheme(mgr.GetScheme()); err != nil { - klog.Fatalf("unable add APIs to scheme: %v", err) - } - - klog.V(0).Info("Setting up controllers") - err = workspace.Add(mgr) - if err != nil { - klog.Fatal("Unable to create workspace controller") - } - - err = namespace.Add(mgr) - if err != nil { - klog.Fatal("Unable to create namespace controller") - } - - err = (&application.ApplicationReconciler{ - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Mapper: mgr.GetRESTMapper(), - Log: klogr.New(), - }).SetupWithManager(mgr) - if err != nil { - klog.Fatal("Unable to create application controller") - } + mgrOptions := manager.Options{ + CertDir: s.WebhookCertDir, + Port: 8443, + } - // TODO(jeff): refactor config with CRD - servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0 - if err = addControllers(mgr, - kubernetesClient, - informerFactory, - devopsClient, - s3Client, - ldapClient, - s.AuthenticationOptions, - openpitrixClient, - s.MultiClusterOptions.Enable, - s.NetworkOptions, - servicemeshEnabled, - s.AuthenticationOptions.KubectlImage, stopCh); err != nil { - klog.Fatalf("unable to register controllers to the manager: %v", err) + if s.LeaderElect { + mgrOptions = manager.Options{ + CertDir: s.WebhookCertDir, + Port: 8443, + LeaderElection: s.LeaderElect, + LeaderElectionNamespace: "kubesphere-system", + LeaderElectionID: "ks-controller-manager-leader-election", + LeaseDuration: &s.LeaderElection.LeaseDuration, + RetryPeriod: &s.LeaderElection.RetryPeriod, + RenewDeadline: &s.LeaderElection.RenewDeadline, } + } - // Start cache data after all informer is registered - informerFactory.Start(stopCh) - - // Setup webhooks - klog.Info("setting up webhook server") - hookServer := mgr.GetWebhookServer() - - klog.Info("registering webhooks to the webhook server") - hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}}) - hookServer.Register("/validate-nsnp-kubesphere-io-v1alpha1-network", &webhook.Admission{Handler: &nsnetworkpolicy.NSNPValidator{Client: mgr.GetClient()}}) - - klog.V(0).Info("Starting the controllers.") - if err = mgr.Start(stopCh); err != nil { - klog.Fatalf("unable to run the manager: %v", err) - } + klog.V(0).Info("setting up manager") - select {} + // Use 8443 instead of 443 cause we need root permission to bind port 443 + mgr, err := manager.New(kubernetesClient.Config(), mgrOptions) + if err != nil { + klog.Fatalf("unable to set up overall controller manager: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + if err = apis.AddToScheme(mgr.GetScheme()); err != nil { + klog.Fatalf("unable add APIs to scheme: %v", err) + } - go func() { - <-stopCh - cancel() - }() + err = workspace.Add(mgr) + if err != nil { + klog.Fatal("Unable to create workspace controller") + } - if !s.LeaderElect { - run(ctx) - return nil + err = namespace.Add(mgr) + if err != nil { + klog.Fatal("Unable to create namespace controller") } - id, err := os.Hostname() + err = (&application.ApplicationReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Mapper: mgr.GetRESTMapper(), + Log: klogr.New(), + }).SetupWithManager(mgr) if err != nil { - return err + klog.Fatal("Unable to create application controller") } - // add a uniquifier so that two processes on the same host don't accidentally both become active - id = id + "_" + string(uuid.NewUUID()) + // TODO(jeff): refactor config with CRD + servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0 + if err = addControllers(mgr, + kubernetesClient, + informerFactory, + devopsClient, + s3Client, + ldapClient, + s.AuthenticationOptions, + openpitrixClient, + s.MultiClusterOptions.Enable, + s.NetworkOptions, + servicemeshEnabled, + s.AuthenticationOptions.KubectlImage, stopCh); err != nil { + klog.Fatalf("unable to register controllers to the manager: %v", err) + } - lock, err := resourcelock.New(resourcelock.LeasesResourceLock, - "kubesphere-system", - "ks-controller-manager", - kubernetesClient.Kubernetes().CoreV1(), - kubernetesClient.Kubernetes().CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: record.NewBroadcaster().NewRecorder(scheme.Scheme, v1.EventSource{ - Component: "ks-controller-manager", - }), - }) + // Start cache data after all informer is registered + klog.V(0).Info("Starting cache resource from apiserver...") + informerFactory.Start(stopCh) - if err != nil { - klog.Fatalf("error creating lock: %v", err) - } + // Setup webhooks + klog.V(2).Info("setting up webhook server") + hookServer := mgr.GetWebhookServer() - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - LeaseDuration: s.LeaderElection.LeaseDuration, - RenewDeadline: s.LeaderElection.RenewDeadline, - RetryPeriod: s.LeaderElection.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - klog.Errorf("leadership lost") - os.Exit(0) - }, - }, - }) + klog.V(2).Info("registering webhooks to the webhook server") + hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}}) + hookServer.Register("/validate-nsnp-kubesphere-io-v1alpha1-network", &webhook.Admission{Handler: &nsnetworkpolicy.NSNPValidator{Client: mgr.GetClient()}}) + + klog.V(0).Info("Starting the controllers.") + if err = mgr.Start(stopCh); err != nil { + klog.Fatalf("unable to run the manager: %v", err) + } return nil } diff --git a/pkg/controller/storage/capability/capability_controller.go b/pkg/controller/storage/capability/capability_controller.go index 8d9006a57ec3908a82a897741f568810a3b10d88..e0cf511f7e59b055b6874e09221b172e048f65fa 100644 --- a/pkg/controller/storage/capability/capability_controller.go +++ b/pkg/controller/storage/capability/capability_controller.go @@ -191,7 +191,7 @@ func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) { } for _, storageClass := range storageClasses { if storageClass.Provisioner == csiDriver.Name { - klog.Info("enqueue StorageClass when handler csiDriver", storageClass) + klog.V(4).Infof("enqueue StorageClass %s when handling csiDriver", storageClass.Name) c.enqueueStorageClass(storageClass) } }