server.go 7.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*

 Copyright 2019 The KubeSphere Authors.

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.

*/

package app

import (
J
Jeff 已提交
22
	"context"
23 24
	"fmt"
	"github.com/spf13/cobra"
J
Jeff 已提交
25
	v1 "k8s.io/api/core/v1"
26
	utilerrors "k8s.io/apimachinery/pkg/util/errors"
J
Jeff 已提交
27 28 29 30
	"k8s.io/apimachinery/pkg/util/uuid"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/client-go/tools/record"
31 32 33 34
	cliflag "k8s.io/component-base/cli/flag"
	"k8s.io/klog"
	"kubesphere.io/kubesphere/cmd/controller-manager/app/options"
	"kubesphere.io/kubesphere/pkg/apis"
35
	controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
J
Jeff 已提交
36
	"kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
37
	"kubesphere.io/kubesphere/pkg/controller/namespace"
D
Duan Jiong 已提交
38
	"kubesphere.io/kubesphere/pkg/controller/network/nsnetworkpolicy"
R
runzexia 已提交
39
	"kubesphere.io/kubesphere/pkg/controller/user"
40
	"kubesphere.io/kubesphere/pkg/controller/workspace"
Z
zryfish 已提交
41 42 43
	"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
	"sigs.k8s.io/controller-runtime/pkg/webhook"

44
	"kubesphere.io/kubesphere/pkg/informers"
R
runzexia 已提交
45
	"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
46
	"kubesphere.io/kubesphere/pkg/simple/client/k8s"
R
runzexia 已提交
47
	"kubesphere.io/kubesphere/pkg/simple/client/s3"
48 49 50 51 52 53 54 55
	"kubesphere.io/kubesphere/pkg/utils/term"
	"os"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)

func NewControllerManagerCommand() *cobra.Command {
	s := options.NewKubeSphereControllerManagerOptions()
56 57
	conf, err := controllerconfig.TryLoadFromDisk()
	if err == nil {
H
hongming 已提交
58
		// make sure LeaderElection is not nil
59 60 61 62 63
		s = &options.KubeSphereControllerManagerOptions{
			KubernetesOptions: conf.KubernetesOptions,
			DevopsOptions:     conf.DevopsOptions,
			S3Options:         conf.S3Options,
			OpenPitrixOptions: conf.OpenPitrixOptions,
H
hongming 已提交
64
			LeaderElection:    s.LeaderElection,
Z
zryfish 已提交
65
			LeaderElect:       s.LeaderElect,
66 67
		}
	}
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84

	cmd := &cobra.Command{
		Use:  "controller-manager",
		Long: `KubeSphere controller manager is a daemon that`,
		Run: func(cmd *cobra.Command, args []string) {
			if errs := s.Validate(); len(errs) != 0 {
				klog.Error(utilerrors.NewAggregate(errs))
				os.Exit(1)
			}

			if err = Run(s, signals.SetupSignalHandler()); err != nil {
				os.Exit(1)
			}
		},
	}

	fs := cmd.Flags()
85
	namedFlagSets := s.Flags()
86 87 88 89 90 91 92 93 94 95 96 97 98 99

	for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}

	usageFmt := "Usage:\n  %s\n"
	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
	})
	return cmd
}

100 101 102 103 104
func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) error {
	kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
	if err != nil {
		klog.Errorf("Failed to create kubernetes clientset %v", err)
		return err
105 106
	}

107
	openpitrixClient, err := openpitrix.NewClient(s.OpenPitrixOptions)
Z
zryfish 已提交
108
	if err != nil {
109 110
		klog.Errorf("Failed to create openpitrix client %v", err)
		return err
Z
zryfish 已提交
111 112
	}

R
runzexia 已提交
113 114 115 116 117
	devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
	if err != nil {
		klog.Errorf("Failed to create devops client %v", err)
		return err
	}
118

R
runzexia 已提交
119 120
	s3Client, err := s3.NewS3Client(s.S3Options)
	if err != nil {
R
runzexia 已提交
121
		klog.Errorf("Failed to create s3 client %v", err)
R
runzexia 已提交
122 123
		return err
	}
124 125

	informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(), kubernetesClient.Istio(), kubernetesClient.Application())
126

J
Jeff 已提交
127 128
	run := func(ctx context.Context) {
		klog.V(0).Info("setting up manager")
129
		mgr, err := manager.New(kubernetesClient.Config(), manager.Options{})
J
Jeff 已提交
130 131 132 133 134 135 136 137 138 139
		if err != nil {
			klog.Fatalf("unable to set up overall controller manager: %v", err)
		}

		klog.V(0).Info("setting up scheme")
		if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
			klog.Fatalf("unable add APIs to scheme: %v", err)
		}

		klog.V(0).Info("Setting up controllers")
140
		err = workspace.Add(mgr)
141 142 143 144 145 146 147
		if err != nil {
			klog.Fatal("Unable to create workspace controller")
		}

		err = namespace.Add(mgr, openpitrixClient)
		if err != nil {
			klog.Fatal("Unable to create namespace controller")
J
Jeff 已提交
148 149
		}

R
runzexia 已提交
150
		if err := AddControllers(mgr, kubernetesClient, informerFactory, devopsClient, s3Client, stopCh); err != nil {
J
Jeff 已提交
151 152 153
			klog.Fatalf("unable to register controllers to the manager: %v", err)
		}

H
hongming 已提交
154 155 156
		// Start cache data after all informer is registered
		informerFactory.Start(stopCh)

R
runzexia 已提交
157 158 159 160 161 162 163
		// Setup webhooks
		klog.Info("setting up webhook server")
		hookServer := mgr.GetWebhookServer()

		klog.Info("registering webhooks to the webhook server")
		hookServer.Register("/mutating-encrypt-password-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.PasswordCipher{Client: mgr.GetClient()}})
		hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
D
Duan Jiong 已提交
164
		hookServer.Register("/validate-service-nsnp-kubesphere-io-v1alpha1-network", &webhook.Admission{Handler: &nsnetworkpolicy.ServiceValidator{}})
R
runzexia 已提交
165

166 167
		klog.V(0).Info("Starting the controllers.")
		if err = mgr.Start(stopCh); err != nil {
J
Jeff 已提交
168 169 170 171
			klog.Fatalf("unable to run the manager: %v", err)
		}

		select {}
172 173
	}

J
Jeff 已提交
174 175 176 177 178 179 180
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		<-stopCh
		cancel()
	}()
181

Z
zryfish 已提交
182 183 184 185 186
	if !s.LeaderElect {
		run(ctx)
		return nil
	}

J
Jeff 已提交
187 188
	id, err := os.Hostname()
	if err != nil {
189 190 191
		return err
	}

J
Jeff 已提交
192 193 194 195 196 197
	// add a uniquifier so that two processes on the same host don't accidentally both become active
	id = id + "_" + string(uuid.NewUUID())

	// TODO: change lockType to lease
	// once we finished moving to Kubernetes v1.16+, we
	// change lockType to lease
198
	lock, err := resourcelock.New(resourcelock.LeasesResourceLock,
J
Jeff 已提交
199
		"kubesphere-system",
200
		"ks-controller-manager",
201 202
		kubernetesClient.Kubernetes().CoreV1(),
		kubernetesClient.Kubernetes().CoordinationV1(),
J
Jeff 已提交
203 204 205 206 207 208 209 210 211
		resourcelock.ResourceLockConfig{
			Identity: id,
			EventRecorder: record.NewBroadcaster().NewRecorder(scheme.Scheme, v1.EventSource{
				Component: "ks-controller-manager",
			}),
		})

	if err != nil {
		klog.Fatalf("error creating lock: %v", err)
212 213
	}

J
Jeff 已提交
214 215
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock:          lock,
216 217 218
		LeaseDuration: s.LeaderElection.LeaseDuration,
		RenewDeadline: s.LeaderElection.RenewDeadline,
		RetryPeriod:   s.LeaderElection.RetryPeriod,
J
Jeff 已提交
219 220 221 222 223 224 225 226 227
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				klog.Errorf("leadership lost")
				os.Exit(0)
			},
		},
	})

228 229
	return nil
}