提交 49d471f4 编写于 作者: M Matt Rickard

Add experimental kubeadm bootstrapper

上级 651128de
......@@ -34,6 +34,7 @@ import (
configCmd "k8s.io/minikube/cmd/minikube/cmd/config"
"k8s.io/minikube/cmd/util"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/bootstrapper/kubeadm"
"k8s.io/minikube/pkg/minikube/bootstrapper/localkube"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
......@@ -174,6 +175,11 @@ func GetClusterBootstrapper(api libmachine.API, bootstrapperName string) (bootst
if err != nil {
return nil, errors.Wrap(err, "getting localkube bootstrapper")
}
case bootstrapper.BootstrapperTypeKubeadm:
b, err = kubeadm.NewKubeadmBootstrapper(api)
if err != nil {
return nil, errors.Wrap(err, "getting kubeadm bootstrapper")
}
default:
return nil, fmt.Errorf("Unknown bootstrapper: %s", bootstrapperName)
}
......
......@@ -20,6 +20,7 @@ import (
"bytes"
"io"
"os"
"path/filepath"
"github.com/pkg/errors"
)
......@@ -63,6 +64,10 @@ type FileAsset struct {
BaseAsset
}
func NewMemoryAssetTarget(d []byte, targetPath, permissions string) *MemoryAsset {
return NewMemoryAsset(d, filepath.Dir(targetPath), filepath.Base(targetPath), permissions)
}
func NewFileAsset(assetName, targetDir, targetName, permissions string) (*FileAsset, error) {
f := &FileAsset{
BaseAsset{
......
......@@ -48,6 +48,7 @@ type KubernetesConfig struct {
const (
BootstrapperTypeLocalkube = "localkube"
BootstrapperTypeKubeadm = "kubeadm"
)
var CachedImagesForBootstrapper = map[string][]string{
......
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeadm
import (
"bytes"
"crypto"
"fmt"
"html/template"
"os"
"path/filepath"
"strings"
"time"
"github.com/docker/machine/libmachine"
"github.com/docker/machine/libmachine/state"
download "github.com/jimmidyson/go-download"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util"
)
type KubeadmBootstrapper struct {
c bootstrapper.CommandRunner
}
// TODO(r2d4): template this with bootstrapper.KubernetesConfig
const kubeletSystemdConf = `
[Service]
Environment="KUBELET_KUBECONFIG_ARGS=--kubeconfig=/etc/kubernetes/kubelet.conf --require-kubeconfig=true"
Environment="KUBELET_SYSTEM_PODS_ARGS=--pod-manifest-path=/etc/kubernetes/manifests --allow-privileged=true"
Environment="KUBELET_DNS_ARGS=--cluster-dns=10.0.0.10 --cluster-domain=cluster.local"
Environment="KUBELET_CADVISOR_ARGS=--cadvisor-port=0"
Environment="KUBELET_CGROUP_ARGS=--cgroup-driver=cgroupfs"
ExecStart=
ExecStart=/usr/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_SYSTEM_PODS_ARGS $KUBELET_DNS_ARGS $KUBELET_CADVISOR_ARGS $KUBELET_CGROUP_ARGS $KUBELET_EXTRA_ARGS
`
const kubeletService = `
[Unit]
Description=kubelet: The Kubernetes Node Agent
Documentation=http://kubernetes.io/docs/
[Service]
ExecStart=/usr/bin/kubelet
Restart=always
StartLimitInterval=0
RestartSec=10
[Install]
WantedBy=multi-user.target
`
const kubeadmConfigTmpl = `
apiVersion: kubeadm.k8s.io/v1alpha1
kind: MasterConfiguration
api:
advertiseAddress: {{.AdvertiseAddress}}
bindPort: {{.APIServerPort}}
kubernetesVersion: {{.KubernetesVersion}}
certificatesDir: {{.CertDir}}
networking:
serviceSubnet: {{.ServiceCIDR}}
etcd:
dataDir: {{.EtcdDataDir}}
`
func NewKubeadmBootstrapper(api libmachine.API) (*KubeadmBootstrapper, error) {
h, err := api.Load(config.GetMachineName())
if err != nil {
return nil, errors.Wrap(err, "getting api client")
}
var cmd bootstrapper.CommandRunner
// The none driver executes commands directly on the host
if h.Driver.DriverName() == constants.DriverNone {
cmd = &bootstrapper.ExecRunner{}
} else {
client, err := sshutil.NewSSHClient(h.Driver)
if err != nil {
return nil, errors.Wrap(err, "getting ssh client")
}
cmd = bootstrapper.NewSSHRunner(client)
}
return &KubeadmBootstrapper{
c: cmd,
}, nil
}
//TODO(r2d4): This should most likely check the health of the apiserver
func (k *KubeadmBootstrapper) GetClusterStatus() (string, error) {
statusCmd := `sudo systemctl is-active kubelet &>/dev/null && echo "Running" || echo "Stopped"`
status, err := k.c.CombinedOutput(statusCmd)
if err != nil {
return "", errors.Wrap(err, "getting status")
}
status = strings.TrimSpace(status)
if status == state.Running.String() || status == state.Stopped.String() {
return status, nil
}
return "", fmt.Errorf("Error: Unrecognized output from ClusterStatus: %s", status)
}
// TODO(r2d4): Should this aggregate all the logs from the control plane?
// Maybe subcommands for each component? minikube logs apiserver?
func (k *KubeadmBootstrapper) GetClusterLogs(follow bool) (string, error) {
var flags []string
if follow {
flags = append(flags, "-f")
}
logsCommand := fmt.Sprintf("sudo journalctl %s -u kubelet", strings.Join(flags, " "))
if follow {
if err := k.c.Run(logsCommand); err != nil {
return "", errors.Wrap(err, "getting shell")
}
}
logs, err := k.c.CombinedOutput(logsCommand)
if err != nil {
return "", errors.Wrap(err, "getting cluster logs")
}
return logs, nil
}
func (k *KubeadmBootstrapper) StartCluster(k8s bootstrapper.KubernetesConfig) error {
// We use --skip-preflight-checks since we have our own custom addons
// that we also stick in /etc/kubernetes/manifests
kubeadmTmpl := "sudo /usr/bin/kubeadm init --config {{.KubeadmConfigFile}} --skip-preflight-checks"
t := template.Must(template.New("kubeadmTmpl").Parse(kubeadmTmpl))
b := bytes.Buffer{}
if err := t.Execute(&b, struct{ KubeadmConfigFile string }{constants.KubeadmConfigFile}); err != nil {
return err
}
err := k.c.Run(b.String())
if err != nil {
return errors.Wrapf(err, "kubeadm init error running command: %s", b.String())
}
//TODO(r2d4): get rid of global here
master = k8s.NodeName
if err := util.RetryAfter(100, unmarkMaster, time.Millisecond*500); err != nil {
return errors.Wrap(err, "timed out waiting to unmark master")
}
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
}
return nil
}
//TODO(r2d4): Split out into shared function between localkube and kubeadm
func addAddons(files *[]assets.CopyableFile) error {
// add addons to file list
// custom addons
assets.AddMinikubeAddonsDirToAssets(files)
// bundled addons
for addonName, addonBundle := range assets.Addons {
// TODO(r2d4): Kubeadm ignores the kube-dns addon and uses its own.
// expose this in a better way
if addonName == "kube-dns" {
continue
}
if isEnabled, err := addonBundle.IsEnabled(); err == nil && isEnabled {
for _, addon := range addonBundle.Assets {
*files = append(*files, addon)
}
} else if err != nil {
return nil
}
}
return nil
}
func (k *KubeadmBootstrapper) RestartCluster(k8s bootstrapper.KubernetesConfig) error {
restoreTmpl := `
sudo kubeadm alpha phase certs all --config {{.KubeadmConfigFile}} &&
sudo /usr/bin/kubeadm alpha phase kubeconfig all --config {{.KubeadmConfigFile}} --node-name {{.NodeName}} &&
sudo /usr/bin/kubeadm alpha phase controlplane all --config {{.KubeadmConfigFile}} &&
sudo /usr/bin/kubeadm alpha phase etcd local --config {{.KubeadmConfigFile}}
`
t := template.Must(template.New("restoreTmpl").Parse(restoreTmpl))
opts := struct {
KubeadmConfigFile string
NodeName string
}{
KubeadmConfigFile: constants.KubeadmConfigFile,
NodeName: k8s.NodeName,
}
b := bytes.Buffer{}
if err := t.Execute(&b, opts); err != nil {
return err
}
if err := k.c.Run(b.String()); err != nil {
return errors.Wrapf(err, "running cmd: %s", b.String())
}
return nil
}
func (k *KubeadmBootstrapper) SetupCerts(k8s bootstrapper.KubernetesConfig) error {
return bootstrapper.SetupCerts(k.c, k8s)
}
func (k *KubeadmBootstrapper) UpdateCluster(cfg bootstrapper.KubernetesConfig) error {
kubeadmCfg, err := k.generateConfig(cfg)
if err != nil {
return errors.Wrap(err, "generating kubeadm cfg")
}
files := []assets.CopyableFile{
assets.NewMemoryAssetTarget([]byte(kubeletService), constants.KubeletServiceFile, "0640"),
assets.NewMemoryAssetTarget([]byte(kubeletSystemdConf), constants.KubeletSystemdConfFile, "0640"),
assets.NewMemoryAssetTarget([]byte(kubeadmCfg), constants.KubeadmConfigFile, "0640"),
}
if err := addAddons(&files); err != nil {
return errors.Wrap(err, "adding addons to copyable files")
}
for _, f := range files {
if err := k.c.Copy(f); err != nil {
return errors.Wrapf(err, "transferring kubeadm file: %+v", f)
}
}
var g errgroup.Group
for _, bin := range []string{"kubelet", "kubeadm"} {
bin := bin
g.Go(func() error {
path, err := maybeDownloadAndCache(bin, cfg.KubernetesVersion)
if err != nil {
return errors.Wrapf(err, "downloading %s", bin)
}
f, err := assets.NewFileAsset(path, "/usr/bin", bin, "0641")
if err != nil {
return errors.Wrap(err, "making new file asset")
}
if err := k.c.Copy(f); err != nil {
return errors.Wrapf(err, "transferring kubeadm file: %+v", f)
}
return nil
})
}
if err := g.Wait(); err != nil {
return errors.Wrap(err, "downloading binaries")
}
err = k.c.Run(`
sudo systemctl daemon-reload &&
sudo systemctl enable kubelet &&
sudo systemctl start kubelet
`)
if err != nil {
return errors.Wrap(err, "starting kubelet")
}
return nil
}
func (k *KubeadmBootstrapper) generateConfig(k8s bootstrapper.KubernetesConfig) (string, error) {
t := template.Must(template.New("kubeadmConfigTmpl").Parse(kubeadmConfigTmpl))
opts := struct {
CertDir string
ServiceCIDR string
AdvertiseAddress string
APIServerPort int
KubernetesVersion string
EtcdDataDir string
}{
CertDir: util.DefaultCertPath,
ServiceCIDR: util.DefaultInsecureRegistry,
AdvertiseAddress: k8s.NodeIP,
APIServerPort: util.APIServerPort,
KubernetesVersion: k8s.KubernetesVersion,
EtcdDataDir: "/data", //TODO(r2d4): change to something else persisted
}
b := bytes.Buffer{}
if err := t.Execute(&b, opts); err != nil {
return "", err
}
return b.String(), nil
}
func maybeDownloadAndCache(binary, version string) (string, error) {
targetDir := constants.MakeMiniPath("cache", version)
targetFilepath := filepath.Join(targetDir, binary)
_, err := os.Stat(targetDir)
// If it exists, do no verification and continue
if err == nil {
return targetFilepath, nil
}
if !os.IsNotExist(err) {
return "", errors.Wrapf(err, "stat %s version %s at %s", binary, version, targetDir)
}
if err = os.MkdirAll(targetDir, 0777); err != nil {
return "", errors.Wrapf(err, "mkdir %s", targetDir)
}
url := constants.GetKubernetesReleaseURL(binary, version)
options := download.FileOptions{
Mkdirs: download.MkdirAll,
}
options.Checksum = constants.GetKubernetesReleaseURLSha1(binary, version)
options.ChecksumHash = crypto.SHA1
fmt.Printf("Downloading %s %s\n", binary, version)
if err := download.ToFile(url, targetFilepath, options); err != nil {
return "", errors.Wrapf(err, "Error downloading %s %s", binary, version)
}
fmt.Printf("Finished Downloading %s %s\n", binary, version)
return targetFilepath, nil
}
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeadm
import (
"encoding/json"
"github.com/pkg/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientv1 "k8s.io/client-go/pkg/api/v1"
rbacv1beta1 "k8s.io/client-go/pkg/apis/rbac/v1beta1"
"k8s.io/minikube/pkg/minikube/service"
)
const masterTaint = "node-role.kubernetes.io/master"
var master = ""
func unmarkMaster() error {
k8s := service.K8s
client, err := k8s.GetCoreClient()
if err != nil {
return errors.Wrap(err, "getting core client")
}
n, err := client.Nodes().Get(master, v1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "getting node %s", master)
}
oldData, err := json.Marshal(n)
if err != nil {
return errors.Wrap(err, "json marshalling data before patch")
}
newTaints := []clientv1.Taint{}
for _, taint := range n.Spec.Taints {
if taint.Key == masterTaint {
continue
}
newTaints = append(newTaints, taint)
}
n.Spec.Taints = newTaints
newData, err := json.Marshal(n)
if err != nil {
return errors.Wrapf(err, "json marshalling data after patch")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, clientv1.Node{})
if err != nil {
return errors.Wrap(err, "creating strategic patch")
}
if _, err := client.Nodes().Patch(n.Name, types.StrategicMergePatchType, patchBytes); err != nil {
if apierrs.IsConflict(err) {
return errors.Wrap(err, "strategic patch conflict")
}
return errors.Wrap(err, "applying strategic patch")
}
return nil
}
// elevateKubeSystemPrivileges gives the kube-system service account
// cluster admin privileges to work with RBAC.
func elevateKubeSystemPrivileges() error {
k8s := service.K8s
client, err := k8s.GetClientset()
clusterRoleBinding := &rbacv1beta1.ClusterRoleBinding{
ObjectMeta: v1.ObjectMeta{
Name: "minikube-rbac",
},
Subjects: []rbacv1beta1.Subject{
{
Kind: "ServiceAccount",
Name: "default",
Namespace: "kube-system",
},
},
RoleRef: rbacv1beta1.RoleRef{
Kind: "ClusterRole",
Name: "cluster-admin",
},
}
_, err = client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding)
if err != nil {
return errors.Wrap(err, "creating clusterrolebinding")
}
return nil
}
......@@ -130,6 +130,12 @@ const (
LocalkubePIDPath = "/var/run/localkube.pid"
)
const (
KubeletServiceFile = "/lib/systemd/system/kubelet.service"
KubeletSystemdConfFile = "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf"
KubeadmConfigFile = "/var/lib/kubeadm.yaml"
)
const (
LocalkubeServicePath = "/usr/lib/systemd/system/localkube.service"
LocalkubeRunning = "active"
......@@ -144,6 +150,17 @@ const (
DefaultMountVersion = "9p2000.u"
)
func GetKubernetesReleaseURL(binaryName, version string) string {
// TODO(r2d4): change this to official releases when the alpha controlplane commands are released.
// We are working with unreleased kubeadm changes at HEAD.
return fmt.Sprintf("https://storage.googleapis.com/minikube-builds/v1.7.3/%s", binaryName)
// return fmt.Sprintf("https://storage.googleapis.com/kubernetes-release/release/%s/bin/linux/amd64/%s", version, binaryName)
}
func GetKubernetesReleaseURLSha1(binaryName, version string) string {
return fmt.Sprintf("%s.sha1", GetKubernetesReleaseURL(binaryName, version))
}
const IsMinikubeChildProcess = "IS_MINIKUBE_CHILD_PROCESS"
const DriverNone = "none"
const FileScheme = "file"
......
......@@ -42,17 +42,26 @@ import (
type K8sClient interface {
GetCoreClient() (corev1.CoreV1Interface, error)
GetClientset() (*kubernetes.Clientset, error)
}
type K8sClientGetter struct{}
var k8s K8sClient
var K8s K8sClient
func init() {
k8s = &K8sClientGetter{}
K8s = &K8sClientGetter{}
}
func (*K8sClientGetter) GetCoreClient() (corev1.CoreV1Interface, error) {
func (k *K8sClientGetter) GetCoreClient() (corev1.CoreV1Interface, error) {
client, err := k.GetClientset()
if err != nil {
return nil, errors.Wrap(err, "getting clientset")
}
return client.Core(), nil
}
func (*K8sClientGetter) GetClientset() (*kubernetes.Clientset, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
......@@ -64,7 +73,8 @@ func (*K8sClientGetter) GetCoreClient() (corev1.CoreV1Interface, error) {
if err != nil {
return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()")
}
return client.Core(), nil
return client, nil
}
type ServiceURL struct {
......@@ -88,7 +98,7 @@ func GetServiceURLs(api libmachine.API, namespace string, t *template.Template)
return nil, err
}
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return nil, err
}
......@@ -125,7 +135,7 @@ func GetServiceURLsForService(api libmachine.API, namespace, service string, t *
return nil, errors.Wrap(err, "Error getting ip from host")
}
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return nil, err
}
......@@ -178,7 +188,7 @@ func printURLsForService(c corev1.CoreV1Interface, ip, service, namespace string
// CheckService waits for the specified service to be ready by returning an error until the service is up
// The check is done by polling the endpoint associated with the service and when the endpoint exists, returning no error->service-online
func CheckService(namespace string, service string) error {
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return errors.Wrap(err, "Error getting kubernetes client")
}
......@@ -242,7 +252,7 @@ func WaitAndMaybeOpenService(api libmachine.API, namespace string, service strin
}
func GetServiceListByLabel(namespace string, key string, value string) (*v1.ServiceList, error) {
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
......@@ -265,7 +275,7 @@ func getServiceListFromServicesByLabel(services corev1.ServiceInterface, key str
// CreateSecret creates or modifies secrets
func CreateSecret(namespace, name string, dataValues map[string]string, labels map[string]string) error {
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return &util.RetriableError{Err: err}
}
......@@ -311,7 +321,7 @@ func CreateSecret(namespace, name string, dataValues map[string]string, labels m
// DeleteSecret deletes a secret from a namespace
func DeleteSecret(namespace, name string) error {
client, err := k8s.GetCoreClient()
client, err := K8s.GetCoreClient()
if err != nil {
return &util.RetriableError{Err: err}
}
......
......@@ -26,6 +26,7 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/pkg/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes/typed/core/v1/fake"
"k8s.io/client-go/pkg/api/v1"
......@@ -43,6 +44,10 @@ func (m *MockClientGetter) GetCoreClient() (corev1.CoreV1Interface, error) {
}, nil
}
func (m *MockClientGetter) GetClientset() (*kubernetes.Clientset, error) {
return nil, nil
}
type MockCoreClient struct {
fake.FakeCoreV1
servicesMap map[string]corev1.ServiceInterface
......@@ -318,13 +323,13 @@ func TestGetServiceURLs(t *testing.T) {
},
}
defer revertK8sClient(k8s)
defer revertK8sClient(K8s)
for _, test := range tests {
test := test
t.Run(test.description, func(t *testing.T) {
t.Parallel()
k8s = &MockClientGetter{
K8s = &MockClientGetter{
servicesMap: serviceNamespaces,
}
urls, err := GetServiceURLs(test.api, test.namespace, defaultTemplate)
......@@ -383,11 +388,11 @@ func TestGetServiceURLsForService(t *testing.T) {
},
}
defer revertK8sClient(k8s)
defer revertK8sClient(K8s)
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
k8s = &MockClientGetter{
K8s = &MockClientGetter{
servicesMap: serviceNamespaces,
}
urls, err := GetServiceURLsForService(test.api, test.namespace, test.service, defaultTemplate)
......@@ -405,5 +410,5 @@ func TestGetServiceURLsForService(t *testing.T) {
}
func revertK8sClient(k K8sClient) {
k8s = k
K8s = k
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册