未验证 提交 5d954202 编写于 作者: Z zryfish 提交者: GitHub

create openpitrix runtime when new cluster added (#2204)

上级 bef5eb46
......@@ -48,18 +48,21 @@ import (
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/kubefed/pkg/controller/util"
)
func AddControllers(
func addControllers(
mgr manager.Manager,
client k8s.Client,
informerFactory informers.InformerFactory,
devopsClient devops.Interface,
s3Client s3.Interface,
openpitrixClient openpitrix.Client,
multiClusterEnabled bool,
networkPolicyEnabled bool,
stopCh <-chan struct{}) error {
kubernetesInformer := informerFactory.KubernetesSharedInformerFactory()
......@@ -94,33 +97,37 @@ func AddControllers(
jobController := job.NewJobController(kubernetesInformer.Batch().V1().Jobs(), client.Kubernetes())
s2iBinaryController := s2ibinary.NewController(client.Kubernetes(),
client.KubeSphere(),
kubesphereInformer.Devops().V1alpha1().S2iBinaries(),
s3Client,
)
s2iRunController := s2irun.NewS2iRunController(client.Kubernetes(),
client.KubeSphere(),
kubesphereInformer.Devops().V1alpha1().S2iBinaries(),
kubesphereInformer.Devops().V1alpha1().S2iRuns())
devopsProjectController := devopsproject.NewController(client.Kubernetes(),
client.KubeSphere(), devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().DevOpsProjects(),
)
devopsPipelineController := pipeline.NewController(client.Kubernetes(),
client.KubeSphere(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().Pipelines())
var s2iBinaryController, s2iRunController, devopsProjectController, devopsPipelineController, devopsCredentialController manager.Runnable
if devopsClient != nil {
s2iBinaryController = s2ibinary.NewController(client.Kubernetes(),
client.KubeSphere(),
kubesphereInformer.Devops().V1alpha1().S2iBinaries(),
s3Client,
)
s2iRunController = s2irun.NewS2iRunController(client.Kubernetes(),
client.KubeSphere(),
kubesphereInformer.Devops().V1alpha1().S2iBinaries(),
kubesphereInformer.Devops().V1alpha1().S2iRuns())
devopsProjectController = devopsproject.NewController(client.Kubernetes(),
client.KubeSphere(), devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().DevOpsProjects(),
)
devopsPipelineController = pipeline.NewController(client.Kubernetes(),
client.KubeSphere(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().Pipelines())
devopsCredentialController = devopscredential.NewController(client.Kubernetes(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubernetesSharedInformerFactory().Core().V1().Secrets())
devopsCredentialController := devopscredential.NewController(client.Kubernetes(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubernetesSharedInformerFactory().Core().V1().Secrets())
}
storageCapabilityController := capability.NewController(
client.Kubernetes(),
......@@ -223,22 +230,29 @@ func AddControllers(
kubesphereInformer.Iam().V1alpha2().RoleBases(), kubesphereInformer.Iam().V1alpha2().WorkspaceRoles(),
fedWorkspaceCache, fedWorkspaceCacheController, multiClusterEnabled)
clusterController := cluster.NewClusterController(
client.Kubernetes(),
client.Config(),
kubesphereInformer.Cluster().V1alpha1().Clusters(),
client.KubeSphere().ClusterV1alpha1().Clusters())
nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(),
kubernetesInformer.Networking().V1().NetworkPolicies())
if err != nil {
return err
var clusterController manager.Runnable
if multiClusterEnabled {
clusterController = cluster.NewClusterController(
client.Kubernetes(),
client.Config(),
kubesphereInformer.Cluster().V1alpha1().Clusters(),
client.KubeSphere().ClusterV1alpha1().Clusters(),
openpitrixClient)
}
var nsnpController manager.Runnable
if networkPolicyEnabled {
nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(), kubernetesInformer.Networking().V1().NetworkPolicies())
if err != nil {
return err
}
nsnpController = nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(),
client.KubeSphere().NetworkV1alpha1(), kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(),
kubernetesInformer.Core().V1().Services(), kubernetesInformer.Core().V1().Nodes(),
kubesphereInformer.Tenant().V1alpha1().Workspaces(),
kubernetesInformer.Core().V1().Namespaces(), nsnpProvider)
}
nsnpController := nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(),
client.KubeSphere().NetworkV1alpha1(), kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(),
kubernetesInformer.Core().V1().Services(), kubernetesInformer.Core().V1().Nodes(),
kubesphereInformer.Tenant().V1alpha1().Workspaces(),
kubernetesInformer.Core().V1().Namespaces(), nsnpProvider)
controllers := map[string]manager.Runnable{
"virtualservice-controller": vsController,
......@@ -274,6 +288,11 @@ func AddControllers(
}
for name, ctrl := range controllers {
if ctrl == nil {
klog.V(4).Infof("%s is not going to run due to dependent component disabled.", name)
continue
}
if err := mgr.Add(ctrl); err != nil {
klog.Error(err, "add controller to manager failed", "name", name)
return err
......
......@@ -10,6 +10,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
"kubesphere.io/kubesphere/pkg/simple/client/network"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"strings"
......@@ -21,9 +22,11 @@ type KubeSphereControllerManagerOptions struct {
DevopsOptions *jenkins.Options
S3Options *s3.Options
OpenPitrixOptions *openpitrix.Options
NetworkOptions *network.Options
MultiClusterOptions *multicluster.Options
LeaderElect bool
LeaderElection *leaderelection.LeaderElectionConfig
WebhookCertDir string
}
func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions {
......@@ -32,13 +35,15 @@ func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions
DevopsOptions: jenkins.NewDevopsOptions(),
S3Options: s3.NewS3Options(),
OpenPitrixOptions: openpitrix.NewOptions(),
NetworkOptions: network.NewNetworkOptions(),
MultiClusterOptions: multicluster.NewOptions(),
LeaderElection: &leaderelection.LeaderElectionConfig{
LeaseDuration: 30 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
},
LeaderElect: false,
LeaderElect: false,
WebhookCertDir: "",
}
return s
......@@ -58,6 +63,7 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets {
s.DevopsOptions.AddFlags(fss.FlagSet("devops"), s.DevopsOptions)
s.S3Options.AddFlags(fss.FlagSet("s3"), s.S3Options)
s.OpenPitrixOptions.AddFlags(fss.FlagSet("openpitrix"), s.OpenPitrixOptions)
s.NetworkOptions.AddFlags(fss.FlagSet("network"), s.NetworkOptions)
s.MultiClusterOptions.AddFlags(fss.FlagSet("multicluster"), s.MultiClusterOptions)
fs := fss.FlagSet("leaderelection")
......@@ -67,6 +73,11 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets {
"Whether to enable leader election. This field should be enabled when controller manager"+
"deployed with multiple replicas.")
fs.StringVar(&s.WebhookCertDir, "webhook-cert-dir", s.WebhookCertDir, ""+
"Certificate directory used to setup webhooks, need tls.crt and tls.key placed inside."+
"if not set, webhook server would look up the server key and certificate in"+
"{TempDir}/k8s-webhook-server/serving-certs")
kfs := fss.FlagSet("klog")
local := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(local)
......@@ -84,6 +95,7 @@ func (s *KubeSphereControllerManagerOptions) Validate() []error {
errs = append(errs, s.KubernetesOptions.Validate()...)
errs = append(errs, s.S3Options.Validate()...)
errs = append(errs, s.OpenPitrixOptions.Validate()...)
errs = append(errs, s.NetworkOptions.Validate()...)
return errs
}
......
......@@ -40,6 +40,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/term"
"os"
......@@ -58,6 +59,7 @@ func NewControllerManagerCommand() *cobra.Command {
DevopsOptions: conf.DevopsOptions,
S3Options: conf.S3Options,
OpenPitrixOptions: conf.OpenPitrixOptions,
NetworkOptions: conf.NetworkOptions,
MultiClusterOptions: conf.MultiClusterOptions,
LeaderElection: s.LeaderElection,
LeaderElect: s.LeaderElect,
......@@ -111,6 +113,15 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
}
}
var openpitrixClient openpitrix.Client
if s.OpenPitrixOptions != nil && !s.OpenPitrixOptions.IsEmpty() {
openpitrixClient, err = openpitrix.NewClient(s.OpenPitrixOptions)
if err != nil {
klog.Errorf("Failed to create openpitrix client %v", err)
return err
}
}
var s3Client s3.Interface
if s.S3Options != nil && len(s.S3Options.Endpoint) != 0 {
s3Client, err = s3.NewS3Client(s.S3Options)
......@@ -125,7 +136,7 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
run := func(ctx context.Context) {
klog.V(0).Info("setting up manager")
mgr, err := manager.New(kubernetesClient.Config(), manager.Options{})
mgr, err := manager.New(kubernetesClient.Config(), manager.Options{CertDir: s.WebhookCertDir})
if err != nil {
klog.Fatalf("unable to set up overall controller manager: %v", err)
}
......@@ -146,7 +157,7 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
klog.Fatal("Unable to create namespace controller")
}
if err := AddControllers(mgr, kubernetesClient, informerFactory, devopsClient, s3Client, s.MultiClusterOptions.Enable, stopCh); err != nil {
if err := addControllers(mgr, kubernetesClient, informerFactory, devopsClient, s3Client, openpitrixClient, s.MultiClusterOptions.Enable, s.NetworkOptions.EnableNetworkPolicy, stopCh); err != nil {
klog.Fatalf("unable to register controllers to the manager: %v", err)
}
......@@ -190,9 +201,6 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// TODO: change lockType to lease
// once we finished moving to Kubernetes v1.16+, we
// change lockType to lease
lock, err := resourcelock.New(resourcelock.LeasesResourceLock,
"kubesphere-system",
"ks-controller-manager",
......
......@@ -92,6 +92,9 @@ const (
// Cluster is all available for requests
ClusterReady ClusterConditionType = "Ready"
// Openpitrix runtime is created
ClusterOpenPitrixRuntimeReady ClusterConditionType = "OpenPitrixRuntimeReady"
)
type ClusterCondition struct {
......
......@@ -26,6 +26,7 @@ import (
clusterclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/cluster/v1alpha1"
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"math/rand"
"net/http"
"reflect"
......@@ -50,7 +51,8 @@ const (
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
kubefedNamespace = "kube-federation-system"
kubefedNamespace = "kube-federation-system"
openpitrixRuntime = "openpitrix.io/runtime"
// Actually host cluster name can be anything, there is only necessary when calling JoinFederation function
hostClusterName = "kubesphere"
......@@ -98,6 +100,8 @@ type clusterController struct {
clusterLister clusterlister.ClusterLister
clusterHasSynced cache.InformerSynced
openpitrixClient openpitrix.Client
queue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
......@@ -112,6 +116,7 @@ func NewClusterController(
config *rest.Config,
clusterInformer clusterinformer.ClusterInformer,
clusterClient clusterclient.ClusterInterface,
openpitrixClient openpitrix.Client,
) *clusterController {
broadcaster := record.NewBroadcaster()
......@@ -127,6 +132,7 @@ func NewClusterController(
client: client,
hostConfig: config,
clusterClient: clusterClient,
openpitrixClient: openpitrixClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
clusterMap: make(map[string]*clusterData),
......@@ -316,6 +322,18 @@ func (c *clusterController) syncCluster(key string) error {
}
}
// clean up openpitrix runtime of the cluster
if _, ok := cluster.Annotations[openpitrixRuntime]; ok {
if c.openpitrixClient != nil {
err = c.openpitrixClient.CleanupRuntime(cluster.Name)
if err != nil {
klog.Errorf("Unable to delete openpitrix for cluster %s, error %v", cluster.Name, err)
return err
}
}
delete(cluster.Annotations, openpitrixRuntime)
}
// remove our cluster finalizer
finalizers := sets.NewString(cluster.ObjectMeta.Finalizers...)
finalizers.Delete(clusterv1alpha1.Finalizer)
......@@ -532,6 +550,22 @@ func (c *clusterController) syncCluster(key string) error {
c.updateClusterCondition(cluster, clusterNotReadyCondition)
}
if c.openpitrixClient != nil { // OpenPitrix is enabled, create runtime
if cluster.GetAnnotations() == nil {
cluster.Annotations = make(map[string]string)
}
if _, ok = cluster.Annotations[openpitrixRuntime]; !ok {
err = c.openpitrixClient.UpsertRuntime(cluster.Name, string(cluster.Spec.Connection.KubeConfig))
if err != nil {
klog.Errorf("Failed to create runtime for cluster %s, error %v", cluster.Name, err)
return err
} else {
cluster.Annotations[openpitrixRuntime] = cluster.Name
}
}
}
if !reflect.DeepEqual(oldCluster, cluster) {
_, err = c.clusterClient.Update(cluster)
if err != nil {
......
-----BEGIN CERTIFICATE-----
MIICyDCCAbCgAwIBAgIBADANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwprdWJl
c3BoZXJlMB4XDTIwMDMyNTEwMTcxNloXDTMwMDMyMzEwMTcxNlowFTETMBEGA1UE
AxMKa3ViZXNwaGVyZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAM5t
5FTkBKKAhxbuL4V5ltOLGAfJVhCp+FOQPKRjRjtsle+TkhdnBx87VVMKDVTyQUxX
ZVq4/S89XomwLhiwUmV9KN9Dv+fsFlD44Bz2P7PtufRv12BRJxGjl5GrZfKOyZ5r
7rxEWLEpkDshcJyHL8hJau1KRTuR8o3mZxIUXQ0nHKRKqC/wpVHvcLCGkAj67RWx
JENl1iw/0domw+3K/ziZUnytOtZ5OFDD9YFWdRWNvt6T8cdFi0PVqTClZ6RlZXQT
lVfzQrrHMJfM+Q/2faldGbjZ7jQWxiMXZB3nRgLVL9aPjUJM6yaD5adeY6wFTN2b
W38JDAcpSVZG1mAOA9ECAwEAAaMjMCEwDgYDVR0PAQH/BAQDAgKkMA8GA1UdEwEB
/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGRQARoCSLAUhinhSGg2Px/yQetA
6EQ9O+9JjY0w3ZKR0qkZxUmIECQ41LxzuArvJtuQDSwxVct0poYEaVLVfJQyCl5P
TTjvBPkuBkIw7511FqturIVtLYkRBSoLb1ygBKbqpknEcLGkQQ7fPG5Oqioo0PNg
xbrWQICG+JfR07sKumNYwOt6ixCcGciPDvcSo6KBUgm31C89abhodJMiDsW36l/d
Uly0sqQqKXwtTNBTr6dMaSDKndbybQg+I7fQ5xqxHIA0DPH3AVGB5nu1Lb+EzyAc
0emX5TTH2IhaHR7/2mDr1ks29DSqMlEi4qrvEyJ+H/Aaj2xVuAIkN95z7FI=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAzm3kVOQEooCHFu4vhXmW04sYB8lWEKn4U5A8pGNGO2yV75OS
F2cHHztVUwoNVPJBTFdlWrj9Lz1eibAuGLBSZX0o30O/5+wWUPjgHPY/s+259G/X
YFEnEaOXkatl8o7JnmvuvERYsSmQOyFwnIcvyElq7UpFO5HyjeZnEhRdDSccpEqo
L/ClUe9wsIaQCPrtFbEkQ2XWLD/R2ibD7cr/OJlSfK061nk4UMP1gVZ1FY2+3pPx
x0WLQ9WpMKVnpGVldBOVV/NCuscwl8z5D/Z9qV0ZuNnuNBbGIxdkHedGAtUv1o+N
QkzrJoPlp15jrAVM3ZtbfwkMBylJVkbWYA4D0QIDAQABAoIBAQCKmRnvLVE3Uhhh
evaJJYPw8LjnsXSJp7VFVdiBfu4Ow+8/ddNmxxPvEkVNhbhByo2xFhTE/uNvLkMl
KxcoGGqaHZitnq1gqBCruQ2wJF3NDtYFmnn+/z2+CBqiAXRnkRsptMoLuc/a0081
hFQ5pEKetTyHqZAmWO2D/KhDaUy4oM4AlHM7ztoa9QYls9yK3Iju8CtY4bBohCd+
UVOJXM7r77OPoNedSgXS0Qqodh/xSQkc3poB++fhrmvSdny2VVbIYiMrNForXXMt
H9gwv4EZHV9j/fSHKzZZKIFjQa7PX9qlTIkVTVq40aP1YCGUADCnQbynAqV2x7aI
FuKYvZxJAoGBANF3nPM27e133L68x6zKqBDC9jmvMau1mD+G0zpWbMIm4tLr/8TI
JmhNida+J3XSKNCD3/sn5VK0IzarrLOy9Gle+xptGfWnXQ+seGKv3sZ9TIZHxLy7
c+umJ1R3MTNZZCH/wMFoUAZp915LeieX2qbhgXxiNpwUcmgGbuY6huHnAoGBAPxJ
gm8w65koWjPZsWZ61dupuZCdvTeEiGknuNn9qxpsy2kriVwMkY7hx9YHm//QRndU
99p13xBfoK6r6jKkFgVYnO73I5l6drclbBWKOk9LoUmyj7opmG454B3rCtD4sH4V
Ufz0p5dPh9u2fNVGgSsr9/cZ8L7ydpKnoKzttaWHAoGBAJ6816sU6G/6UsYVB/ix
R3YUWUZTtv+aSXty8CYYkqj7kQ8om5aD219Vy2x6hCrsjdOe7KFHlaj8LGdk6Cby
KfikawSWw4J/R632FPEsuFWRZOmp/7TLFcjyDIIdJaL1TkXZrDillWeW7EiASF0y
3hdzc45QW3QipqSgo3BD0C3nAoGAcmx0uDGb6Y8fujyV9UDpFgPYofqpAzEdHXRi
ppwiqo2FhGG2nwUkT4WfuTciTdgM3bqv32lujmKQ4igexHEfe7VBGDEG+GlOxqo2
Z16WK7jFokNWZLzQ4c1mFKux8C2a/tR0pyHfkQUFPWV7INAlbYs3n98oDbKyjLly
anw10qkCgYEAijYeIU+njMLLcWiIHlDfrNA3Se7oGflbclIcT/JjyUwFOkE5YnQO
n+PdfG4tGg5XHp+bts/2zV6t5noPKVq1uid+nlR+NHGRign+UjwSGblEshShF8Yl
LbOiLqBMi6XP23glm/jXHSezjG/1uvkW7+92jmNlh6Knax7+YJcxqJ8=
-----END RSA PRIVATE KEY-----
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册