未验证 提交 5d9445d1 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #3254 from yuswift/feature/validate-member_cluster-config

validate member cluster config while joining member clusters.
......@@ -18,36 +18,45 @@ package v1alpha1
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/emicklei/go-restful"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/emicklei/go-restful"
"gopkg.in/yaml.v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/cli-runtime/pkg/printers"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/client/informers/externalversions"
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/version"
"net/http"
"net/url"
"strings"
"time"
"k8s.io/cli-runtime/pkg/printers"
)
const (
defaultAgentImage = "kubesphere/tower:v1.0"
defaultTimeout = 10 * time.Second
KubesphereNamespace = "kubesphere-system"
KubeSphereConfigName = "kubesphere-config"
KubeSphereApiServer = "ks-apiserver"
)
var errClusterConnectionIsNotProxy = fmt.Errorf("cluster is not using proxy connection")
......@@ -55,21 +64,25 @@ var errClusterConnectionIsNotProxy = fmt.Errorf("cluster is not using proxy conn
type handler struct {
serviceLister v1.ServiceLister
clusterLister clusterlister.ClusterLister
configMapLister v1.ConfigMapLister
proxyService string
proxyAddress string
agentImage string
yamlPrinter *printers.YAMLPrinter
}
func newHandler(serviceLister v1.ServiceLister, clusterLister clusterlister.ClusterLister, proxyService, proxyAddress, agentImage string) *handler {
func newHandler(k8sInformers k8sinformers.SharedInformerFactory, ksInformers externalversions.SharedInformerFactory, proxyService, proxyAddress, agentImage string) *handler {
if len(agentImage) == 0 {
agentImage = defaultAgentImage
}
return &handler{
serviceLister: serviceLister,
clusterLister: clusterLister,
serviceLister: k8sInformers.Core().V1().Services().Lister(),
clusterLister: ksInformers.Cluster().V1alpha1().Clusters().Lister(),
configMapLister: k8sInformers.Core().V1().ConfigMaps().Lister(),
proxyService: proxyService,
proxyAddress: proxyAddress,
agentImage: agentImage,
......@@ -269,6 +282,11 @@ func (h *handler) validateCluster(request *restful.Request, response *restful.Re
return
}
err = h.validateMemberClusterConfiguration(cluster.Spec.Connection.KubeConfig)
if err != nil {
api.HandleBadRequest(response, request, fmt.Errorf("failed to validate member cluster configuration, err: %v", err))
}
response.WriteHeader(http.StatusOK)
}
......@@ -348,7 +366,7 @@ func validateKubeSphereAPIServer(ksEndpoint string, kubeconfig []byte) (*version
}
client.Transport = transport
path = fmt.Sprintf("%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy/kapis/version", config.Host)
path = fmt.Sprintf("%s/api/v1/namespaces/%s/services/:%s:/proxy/kapis/version", config.Host, KubesphereNamespace, KubeSphereApiServer)
}
response, err := client.Get(path)
......@@ -362,14 +380,83 @@ func validateKubeSphereAPIServer(ksEndpoint string, kubeconfig []byte) (*version
response.Body = ioutil.NopCloser(bytes.NewBuffer(responseBytes))
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("invalid response: %s , please make sure ks-apiserver.kubesphere-system.svc of member cluster is up and running", responseBody)
return nil, fmt.Errorf("invalid response: %s , please make sure %s.%s.svc of member cluster is up and running", KubeSphereApiServer, KubesphereNamespace, responseBody)
}
ver := version.Info{}
err = json.NewDecoder(response.Body).Decode(&ver)
if err != nil {
return nil, fmt.Errorf("invalid response: %s , please make sure ks-apiserver.kubesphere-system.svc of member cluster is up and running", responseBody)
return nil, fmt.Errorf("invalid response: %s , please make sure %s.%s.svc of member cluster is up and running", KubeSphereApiServer, KubesphereNamespace, responseBody)
}
return &ver, nil
}
// validateMemberClusterConfiguration compares host and member cluster jwt, if they are not same, it changes member
// cluster jwt to host's, then restart member cluster ks-apiserver.
func (h *handler) validateMemberClusterConfiguration(memberKubeconfig []byte) error {
hConfig, err := h.getHostClusterConfig()
if err != nil {
return err
}
mConfig, err := h.getMemberClusterConfig(memberKubeconfig)
if err != nil {
return err
}
if hConfig.AuthenticationOptions.JwtSecret != mConfig.AuthenticationOptions.JwtSecret {
return fmt.Errorf("hostcluster Jwt is not equal to member cluster jwt, please edit the member cluster cluster config")
}
return nil
}
// getMemberClusterConfig returns KubeSphere running config by the given member cluster kubeconfig
func (h *handler) getMemberClusterConfig(kubeconfig []byte) (*config.Config, error) {
config, err := loadKubeConfigFromBytes(kubeconfig)
if err != nil {
return nil, err
}
config.Timeout = defaultTimeout
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
memberCm, err := clientSet.CoreV1().ConfigMaps(KubesphereNamespace).Get(context.Background(), KubeSphereConfigName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return getConfigFromCm(memberCm)
}
// getHostClusterConfig returns KubeSphere running config from host cluster ConfigMap
func (h *handler) getHostClusterConfig() (*config.Config, error) {
hostCm, err := h.configMapLister.ConfigMaps(KubesphereNamespace).Get(KubeSphereConfigName)
if err != nil {
return nil, fmt.Errorf("failed to get host cluster %s/configmap/%s, err: %s", KubesphereNamespace, KubeSphereConfigName, err)
}
return getConfigFromCm(hostCm)
}
// getConfigFromCm returns KubeSphere ruuning config by the given ConfigMap.
func getConfigFromCm(cm *corev1.ConfigMap) (*config.Config, error) {
Config := config.New()
value, ok := cm.Data["kubesphere.yaml"]
if !ok {
return nil, fmt.Errorf("failed to get %s/configmap/%s kubesphere.yaml value", KubesphereNamespace, KubeSphereConfigName)
}
err := yaml.Unmarshal([]byte(value), Config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal value from %s/configmap/%s. err: %s", KubesphereNamespace, KubeSphereConfigName, err)
}
return Config, nil
}
......@@ -18,22 +18,29 @@ package v1alpha1
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"
k8s "k8s.io/client-go/kubernetes"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/version"
"net/http"
"net/http/httptest"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"testing"
)
const (
......@@ -79,6 +86,93 @@ var service = &corev1.Service{
},
}
var hostMap = map[string]string{
"kubesphere.yaml": `
monitoring:
endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
authentication:
jwtSecret: sQh3JOqNbmci6Gu94TeV10AY7ipltwjp
oauthOptions:
accessTokenMaxAge: 0s
accessTokenInactivityTimeout: 0s
`,
}
var memberMap = map[string]string{
"kubesphere.yaml": `
monitoring:
endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
authentication:
jwtSecret: sQh3JOqNbmci6Gu94TeV10AY7ipltwj
oauthOptions:
accessTokenMaxAge: 0s
accessTokenInactivityTimeout: 0s
`,
}
var hostCm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: KubesphereNamespace,
Name: KubeSphereConfigName,
},
Data: hostMap,
}
var memberCm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: KubesphereNamespace,
Name: KubeSphereConfigName,
},
Data: memberMap,
}
var ksApiserverDeploy = `
{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"labels": {
"app": "ks-apiserver",
"tier": "backend",
"version": "v3.0.0"
},
"name": "ks-apiserver",
"namespace": "kubesphere-system"
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "ks-apiserver",
"tier": "backend",
"version": "v3.0.0"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"app": "ks-apiserver",
"tier": "backend",
"version": "v3.0.0"
}
},
"spec": {
"containers": [
{
"command": [
"ks-apiserver",
"--logtostderr=true"
],
"image": "kubesphere/ks-apiserver:v3.0.0",
"name": "ks-apiserver"
}
]
}
}
}
}`
var expected = `apiVersion: apps/v1
kind: Deployment
metadata:
......@@ -157,8 +251,8 @@ func TestGeranteAgentDeployment(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
h := newHandler(informersFactory.KubernetesSharedInformerFactory().Core().V1().Services().Lister(),
informersFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister(),
h := newHandler(informersFactory.KubernetesSharedInformerFactory(),
informersFactory.KubeSphereSharedInformerFactory(),
proxyService,
"",
agentImage)
......@@ -238,8 +332,8 @@ func TestValidateKubeConfig(t *testing.T) {
informersFactory.KubernetesSharedInformerFactory().Core().V1().Services().Informer().GetIndexer().Add(service)
informersFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Informer().GetIndexer().Add(cluster)
h := newHandler(informersFactory.KubernetesSharedInformerFactory().Core().V1().Services().Lister(),
informersFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister(),
h := newHandler(informersFactory.KubernetesSharedInformerFactory(),
informersFactory.KubeSphereSharedInformerFactory(),
proxyService,
"",
agentImage)
......@@ -303,3 +397,109 @@ func TestValidateKubeSphereEndpoint(t *testing.T) {
}
}
func TestValidateMemberClusterConfiguration(t *testing.T) {
k8sclient := k8sfake.NewSimpleClientset(service)
ksclient := fake.NewSimpleClientset(cluster)
informersFactory := informers.NewInformerFactories(k8sclient, ksclient, nil, nil, nil, nil)
informersFactory.KubernetesSharedInformerFactory().Core().V1().Services().Informer().GetIndexer().Add(service)
informersFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Informer().GetIndexer().Add(cluster)
informersFactory.KubernetesSharedInformerFactory().Core().V1().ConfigMaps().Informer().GetIndexer().Add(hostCm)
h := newHandler(informersFactory.KubernetesSharedInformerFactory(),
informersFactory.KubeSphereSharedInformerFactory(),
proxyService,
"",
agentImage)
config, err := loadKubeConfigFromBytes([]byte(base64EncodedKubeConfig))
if err != nil {
t.Fatal(err)
}
// config.Host is schemaless, we need to add schema manually
u, err := url.Parse(fmt.Sprintf("http://%s", config.Host))
if err != nil {
t.Fatal(err)
}
// we need to specify apiserver port to match above kubeconfig
env := &envtest.Environment{
Config: config,
ControlPlane: envtest.ControlPlane{
APIServer: &envtest.APIServer{
Args: envtest.DefaultKubeAPIServerFlags,
URL: u,
},
},
}
cfg, err := env.Start()
if err != nil {
t.Log(cfg)
t.Fatal(err)
}
defer func() {
_ = env.Stop()
}()
addMemberClusterResource(hostCm, t)
err = h.validateMemberClusterConfiguration([]byte(base64EncodedKubeConfig))
if err != nil {
t.Fatal(err)
}
addMemberClusterResource(memberCm, t)
err = h.validateMemberClusterConfiguration([]byte(base64EncodedKubeConfig))
if err == nil {
t.Fatal()
}
t.Log(err)
}
func addMemberClusterResource(targetCm *corev1.ConfigMap, t *testing.T) {
con, err := clientcmd.NewClientConfigFromBytes([]byte(base64EncodedKubeConfig))
if err != nil {
t.Fatal(err)
}
cli, err := con.ClientConfig()
if err != nil {
t.Fatal(err)
}
c, err := k8s.NewForConfig(cli)
if err != nil {
t.Fatal(err)
}
_, err = c.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: KubesphereNamespace}}, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
t.Fatal(err)
}
_, err = c.CoreV1().ConfigMaps(KubesphereNamespace).Create(context.Background(), targetCm, metav1.CreateOptions{})
if err != nil && errors.IsAlreadyExists(err) {
_, err = c.CoreV1().ConfigMaps(KubesphereNamespace).Update(context.Background(), targetCm, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
deploy := appsv1.Deployment{}
err = json.Unmarshal([]byte(ksApiserverDeploy), &deploy)
if err != nil {
t.Fatal(err)
}
_, err = c.AppsV1().Deployments(KubesphereNamespace).Create(context.Background(), &deploy, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
t.Fatal(err)
}
}
......@@ -17,15 +17,17 @@ limitations under the License.
package v1alpha1
import (
"net/http"
"github.com/emicklei/go-restful"
restfulspec "github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sinformers "k8s.io/client-go/informers"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
"kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/constants"
"net/http"
)
const (
......@@ -42,7 +44,7 @@ func AddToContainer(container *restful.Container,
agentImage string) error {
webservice := runtime.NewWebService(GroupVersion)
h := newHandler(k8sInformers.Core().V1().Services().Lister(), ksInformers.Cluster().V1alpha1().Clusters().Lister(), proxyService, proxyAddress, agentImage)
h := newHandler(k8sInformers, ksInformers, proxyService, proxyAddress, agentImage)
// returns deployment yaml for cluster agent
webservice.Route(webservice.GET("/clusters/{cluster}/agent/deployment").
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册