diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index 390967d52d1d8c5d60e0607940c7d3293e157515..ea3d021a4a430d36e81b938d4ab09f522b6b5c7b 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -34,6 +34,14 @@ import ( "time" ) +// Cluster controller only runs under multicluster mode. Cluster controller is following below steps, +// 1. Populates proxy spec if cluster connection type is proxy +// 1.1 Wait for cluster agent is ready if connection type is proxy +// 2. Join cluster into federation control plane if kubeconfig is ready. +// 3. Pull cluster version and configz, set result to cluster status +// Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed +// in case there aren't any cluster changes made. + const ( // maxRetries is the number of times a service will be retried before it is dropped out of the queue. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the @@ -44,6 +52,7 @@ const ( kubefedNamespace = "kube-federation-system" + // Actually host cluster name can be anything, there is only necessary when calling JoinFederation function hostClusterName = "kubesphere" // allocate kubernetesAPIServer port in range [portRangeMin, portRangeMax] for agents if port is not specified @@ -51,7 +60,7 @@ const ( portRangeMin = 6000 portRangeMax = 7000 - // Service port + // Proxy service port kubernetesPort = 6443 kubespherePort = 80 @@ -307,6 +316,7 @@ func (c *clusterController) syncCluster(key string) error { } } + // remove our cluster finalizer finalizers := sets.NewString(cluster.ObjectMeta.Finalizers...) finalizers.Delete(clusterv1alpha1.Finalizer) cluster.ObjectMeta.Finalizers = finalizers.List() @@ -317,10 +327,13 @@ func (c *clusterController) syncCluster(key string) error { return nil } + // save a old copy of cluster oldCluster := cluster.DeepCopy() // prepare for proxy to member cluster if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy { + + // allocate ports for kubernetes and kubesphere endpoint if cluster.Spec.Connection.KubeSphereAPIServerPort == 0 || cluster.Spec.Connection.KubernetesAPIServerPort == 0 { port, err := c.allocatePort() @@ -338,6 +351,7 @@ func (c *clusterController) syncCluster(key string) error { cluster.Spec.Connection.Token = c.generateToken() } + // create a proxy service spec mcService := v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, @@ -370,7 +384,7 @@ func (c *clusterController) syncCluster(key string) error { } service, err := c.client.CoreV1().Services(defaultAgentNamespace).Get(serviceName, metav1.GetOptions{}) - if err != nil { + if err != nil { // proxy service not found if errors.IsNotFound(err) { service, err = c.client.CoreV1().Services(defaultAgentNamespace).Create(&mcService) if err != nil { @@ -379,7 +393,7 @@ func (c *clusterController) syncCluster(key string) error { } return err - } else { + } else { // update existed proxy service if !reflect.DeepEqual(service.Spec, mcService.Spec) { mcService.ObjectMeta = service.ObjectMeta mcService.Spec.ClusterIP = service.Spec.ClusterIP @@ -391,7 +405,7 @@ func (c *clusterController) syncCluster(key string) error { } } - // populated the kubernetes apiEndpoint and kubesphere apiEndpoint + // populates the kubernetes apiEndpoint and kubesphere apiEndpoint cluster.Spec.Connection.KubernetesAPIEndpoint = fmt.Sprintf("https://%s:%d", service.Spec.ClusterIP, kubernetesPort) cluster.Spec.Connection.KubeSphereAPIEndpoint = fmt.Sprintf("http://%s:%d", service.Spec.ClusterIP, kubespherePort) @@ -415,10 +429,12 @@ func (c *clusterController) syncCluster(key string) error { } } + // kubeconfig not ready, nothing to do if len(cluster.Spec.Connection.KubeConfig) == 0 { return nil } + // build up cached cluster data if there isn't any c.mu.Lock() clusterDt, ok := c.clusterMap[cluster.Name] if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) { @@ -502,6 +518,19 @@ func (c *clusterController) syncCluster(key string) error { c.updateClusterCondition(cluster, clusterReadyCondition) } + if !isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) { + clusterNotReadyCondition := clusterv1alpha1.ClusterCondition{ + Type: clusterv1alpha1.ClusterReady, + Status: v1.ConditionFalse, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "Unable to establish connection with cluster", + Message: "Cluster is not available now", + } + + c.updateClusterCondition(cluster, clusterNotReadyCondition) + } + if !reflect.DeepEqual(oldCluster, cluster) { _, err = c.clusterClient.Update(cluster) if err != nil { @@ -513,6 +542,7 @@ func (c *clusterController) syncCluster(key string) error { return nil } +// tryToFetchKubeSphereComponents will send requests to member cluster configz api using kube-apiserver proxy way func (c *clusterController) tryToFetchKubeSphereComponents(host string, transport http.RoundTripper) (map[string]bool, error) { client := http.Client{ Transport: transport, @@ -569,6 +599,7 @@ func (c *clusterController) handleErr(err error, key interface{}) { utilruntime.HandleError(err) } +// isConditionTrue checks cluster specific condition value is True, return false if condition not exists func isConditionTrue(cluster *clusterv1alpha1.Cluster, conditionType clusterv1alpha1.ClusterConditionType) bool { for _, condition := range cluster.Status.Conditions { if condition.Type == conditionType && condition.Status == v1.ConditionTrue {