提交 773cfb7f 编写于 作者: T Thomas Stromberg

Expand lock scope for kubeconfig and certificate update ops

上级 c376994b
......@@ -57,8 +57,26 @@ var (
// SetupCerts gets the generated credentials required to talk to the APIServer.
func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
// WARNING: This function was not designed for multiple profiles, so it is VERY racey:
//
// It updates a shared certificate file and uploads it to the apiserver before launch.
//
// If another process updates the shared certificate, it's invalid.
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
spec := mutex.Spec{
Name: "setupCerts",
Clock: clock.WallClock,
Delay: 15 * time.Second,
}
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()
localPath := constants.GetMinipath()
glog.Infof("Setting up certificates for IP: %s\n", k8s.NodeIP)
glog.Infof("Setting up %s for IP: %s\n", localPath, k8s.NodeIP)
if err := generateCerts(k8s); err != nil {
return errors.Wrap(err, "Error generating certs")
......@@ -126,19 +144,6 @@ func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
}
func generateCerts(k8s config.KubernetesConfig) error {
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
spec := mutex.Spec{
Name: "generateCerts",
Clock: clock.WallClock,
Delay: 10 * time.Second,
}
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()
serviceIP, err := util.GetServiceClusterIP(k8s.ServiceCIDR)
if err != nil {
return errors.Wrap(err, "getting service cluster ip")
......
......@@ -228,6 +228,12 @@ func (k *Bootstrapper) createCompatSymlinks() error {
// StartCluster starts the cluster
func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
start := time.Now()
glog.Infof("StartCluster: %+v", k8s)
defer func() {
glog.Infof("StartCluster complete in %s", time.Since(start))
}()
version, err := parseKubernetesVersion(k8s.KubernetesVersion)
if err != nil {
return errors.Wrap(err, "parsing kubernetes version")
......@@ -266,7 +272,15 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
glog.Infof("Configuring cluster permissions ...")
if err := retry.Expo(elevateKubeSystemPrivileges, time.Millisecond*500, 60*time.Second); err != nil {
elevate := func() error {
client, err := k.client(k8s)
if err != nil {
return err
}
return elevateKubeSystemPrivileges(client)
}
if err := retry.Expo(elevate, time.Millisecond*500, 120*time.Second); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
}
......@@ -326,6 +340,23 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
return nil
}
// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientset, error) {
// Catch case if WaitCluster was called with a stale ~/.kube/config
config, err := kapi.ClientConfig(k.contextName)
if err != nil {
return nil, errors.Wrap(err, "client config")
}
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
if config.Host != endpoint {
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
config.Host = endpoint
}
return kubernetes.NewForConfig(config)
}
// WaitCluster blocks until Kubernetes appears to be healthy.
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Duration) error {
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
......@@ -341,22 +372,11 @@ func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Dur
return errors.Wrap(err, "waiting for apiserver")
}
// Catch case if WaitCluster was called with a stale ~/.kube/config
config, err := kapi.ClientConfig(k.contextName)
client, err := k.client(k8s)
if err != nil {
return errors.Wrap(err, "client config")
return errors.Wrap(err, "client")
}
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
if config.Host != endpoint {
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
config.Host = endpoint
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "k8s client")
}
for _, p := range PodsByLayer {
if componentsOnly && p.key != "component" { // skip component check if network plugin is cni
continue
......
......@@ -24,8 +24,7 @@ import (
"github.com/pkg/errors"
rbac "k8s.io/api/rbac/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/client-go/kubernetes"
"k8s.io/minikube/pkg/util/retry"
)
......@@ -35,13 +34,8 @@ const (
// elevateKubeSystemPrivileges gives the kube-system service account
// cluster admin privileges to work with RBAC.
func elevateKubeSystemPrivileges() error {
func elevateKubeSystemPrivileges(client kubernetes.Interface) error {
start := time.Now()
k8s := service.K8s
client, err := k8s.GetClientset(constants.DefaultK8sClientTimeout)
if err != nil {
return errors.Wrap(err, "getting clientset")
}
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: meta.ObjectMeta{
Name: rbacName,
......@@ -63,8 +57,7 @@ func elevateKubeSystemPrivileges() error {
glog.Infof("Role binding %s already exists. Skipping creation.", rbacName)
return nil
}
_, err = client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding)
if err != nil {
if _, err := client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding); err != nil {
netErr, ok := err.(net.Error)
if ok && netErr.Timeout() {
return &retry.RetriableError{Err: errors.Wrap(err, "creating clusterrolebinding")}
......
......@@ -470,7 +470,10 @@ func createHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error
if err := api.Save(h); err != nil {
return nil, errors.Wrap(err, "save")
}
return h, nil
// Ensure that even new VM's have proper time synchronization up front
// It's 2019, and I can't believe I am still dealing with time desync as a problem.
return h, ensureSyncedGuestClock(h)
}
// GetHostDockerEnv gets the necessary docker env variables to allow the use of docker through minikube's vm
......
......@@ -62,8 +62,8 @@ func CreateProfile(name string, cfg *Config, miniHome ...string) error {
if err != nil {
return err
}
glog.Infof("Saving config:\n%s", data)
path := profileFilePath(name, miniHome...)
glog.Infof("Saving config to %s ...", path)
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return err
}
......
......@@ -19,8 +19,11 @@ package kubeconfig
import (
"io/ioutil"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/juju/clock"
"github.com/juju/mutex"
"github.com/pkg/errors"
"k8s.io/client-go/tools/clientcmd/api"
)
......@@ -116,9 +119,17 @@ func PopulateFromSettings(cfg *Settings, apiCfg *api.Config) error {
// activeContext is true when minikube is the CurrentContext
// If no CurrentContext is set, the given name will be used.
func Update(kcs *Settings) error {
glog.Infoln("Using kubeconfig: ", kcs.filePath())
// Add a lock around both the read, update, and write operations
spec := mutex.Spec{Name: "kubeconfigUpdate", Clock: clock.WallClock, Delay: 10 * time.Second}
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()
// read existing config or create new if does not exist
glog.Infoln("Updating kubeconfig: ", kcs.filePath())
kcfg, err := readOrNew(kcs.filePath())
if err != nil {
return err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册