未验证 提交 9b9d7c35 编写于 作者: M Medya Ghazizadeh 提交者: GitHub

Merge pull request #7591 from tstromberg/oci-go-faster

kic: Use SSHRunner by default (20% faster startup)
......@@ -329,23 +329,24 @@ func profileDeletionErr(cname string, additionalInfo string) error {
func uninstallKubernetes(api libmachine.API, cc config.ClusterConfig, n config.Node, bsName string) error {
out.T(out.Resetting, "Uninstalling Kubernetes {{.kubernetes_version}} using {{.bootstrapper_name}} ...", out.V{"kubernetes_version": cc.KubernetesConfig.KubernetesVersion, "bootstrapper_name": bsName})
clusterBootstrapper, err := cluster.Bootstrapper(api, bsName, cc, n)
host, err := machine.LoadHost(api, driver.MachineName(cc, n))
if err != nil {
return DeletionError{Err: fmt.Errorf("unable to get bootstrapper: %v", err), Errtype: Fatal}
return DeletionError{Err: fmt.Errorf("unable to load host: %v", err), Errtype: MissingCluster}
}
host, err := machine.LoadHost(api, driver.MachineName(cc, n))
r, err := machine.CommandRunner(host)
if err != nil {
exit.WithError("Error getting host", err)
return DeletionError{Err: fmt.Errorf("unable to get command runner %v", err), Errtype: MissingCluster}
}
r, err := machine.CommandRunner(host)
clusterBootstrapper, err := cluster.Bootstrapper(api, bsName, cc, r)
if err != nil {
exit.WithError("Failed to get command runner", err)
return DeletionError{Err: fmt.Errorf("unable to get bootstrapper: %v", err), Errtype: Fatal}
}
cr, err := cruntime.New(cruntime.Config{Type: cc.KubernetesConfig.ContainerRuntime, Runner: r})
if err != nil {
exit.WithError("Failed runtime", err)
return DeletionError{Err: fmt.Errorf("unable to get runtime: %v", err), Errtype: Fatal}
}
// Unpause the cluster if necessary to avoid hung kubeadm
......
......@@ -53,7 +53,7 @@ var logsCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
co := mustload.Running(ClusterFlagValue())
bs, err := cluster.Bootstrapper(co.API, viper.GetString(cmdcfg.Bootstrapper), *co.Config, *co.CP.Node)
bs, err := cluster.Bootstrapper(co.API, viper.GetString(cmdcfg.Bootstrapper), *co.Config, co.CP.Runner)
if err != nil {
exit.WithError("Error getting cluster bootstrapper", err)
}
......
......@@ -68,18 +68,8 @@ type Bootstrapper struct {
}
// NewBootstrapper creates a new kubeadm.Bootstrapper
// TODO(#6891): Remove node as an argument
func NewBootstrapper(api libmachine.API, cc config.ClusterConfig, n config.Node) (*Bootstrapper, error) {
name := driver.MachineName(cc, n)
h, err := api.Load(name)
if err != nil {
return nil, errors.Wrap(err, "getting api client")
}
runner, err := machine.CommandRunner(h)
if err != nil {
return nil, errors.Wrap(err, "command runner")
}
return &Bootstrapper{c: runner, contextName: cc.Name, k8sClient: nil}, nil
func NewBootstrapper(api libmachine.API, cc config.ClusterConfig, r command.Runner) (*Bootstrapper, error) {
return &Bootstrapper{c: r, contextName: cc.Name, k8sClient: nil}, nil
}
// GetAPIServerStatus returns the api-server status
......@@ -111,8 +101,7 @@ func (k *Bootstrapper) LogCommands(cfg config.ClusterConfig, o bootstrapper.LogO
dmesg.WriteString(fmt.Sprintf(" | tail -n %d", o.Lines))
}
describeNodes := fmt.Sprintf("sudo %s describe nodes --kubeconfig=%s",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
describeNodes := fmt.Sprintf("sudo %s describe nodes --kubeconfig=%s", kubectlPath(cfg),
path.Join(vmpath.GuestPersistentDir, "kubeconfig"))
return map[string]string{
......@@ -218,7 +207,7 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error {
go func() {
// the overlay is required for containerd and cri-o runtime: see #7428
if driver.IsKIC(cfg.Driver) && cfg.KubernetesConfig.ContainerRuntime != "docker" {
if err := k.applyKicOverlay(cfg); err != nil {
if err := k.applyKICOverlay(cfg); err != nil {
glog.Errorf("failed to apply kic overlay: %v", err)
}
}
......@@ -704,7 +693,7 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
}
// Installs compatibility shims for non-systemd environments
kubeletPath := path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl")
kubeletPath := path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubelet")
shims, err := sm.GenerateInitShim("kubelet", kubeletPath, bsutil.KubeletSystemdConfFile)
if err != nil {
return errors.Wrap(err, "shim")
......@@ -764,21 +753,32 @@ func startKubeletIfRequired(runner command.Runner, sm sysinit.Manager) error {
return sm.Start("kubelet")
}
// applyKicOverlay applies the CNI plugin needed to make kic work
func (k *Bootstrapper) applyKicOverlay(cfg config.ClusterConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"), "create", fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")),
"-f", "-")
// kubectlPath returns the path to the kubelet
func kubectlPath(cfg config.ClusterConfig) string {
return path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl")
}
// applyKICOverlay applies the CNI plugin needed to make kic work
func (k *Bootstrapper) applyKICOverlay(cfg config.ClusterConfig) error {
b := bytes.Buffer{}
if err := kicCNIConfig.Execute(&b, struct{ ImageName string }{ImageName: kic.OverlayImage}); err != nil {
return err
}
cmd.Stdin = bytes.NewReader(b.Bytes())
ko := path.Join(vmpath.GuestEphemeralDir, fmt.Sprintf("kic_overlay.yaml"))
f := assets.NewMemoryAssetTarget(b.Bytes(), ko, "0644")
if err := k.c.Copy(f); err != nil {
return errors.Wrapf(err, "copy")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg), "apply",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")),
"-f", ko)
if rr, err := k.c.RunCmd(cmd); err != nil {
return errors.Wrapf(err, "cmd: %s output: %s", rr.Command(), rr.Output())
}
......@@ -807,8 +807,7 @@ func (k *Bootstrapper) applyNodeLabels(cfg config.ClusterConfig) error {
defer cancel()
// example:
// sudo /var/lib/minikube/binaries/<version>/kubectl label nodes minikube.k8s.io/version=<version> minikube.k8s.io/commit=aa91f39ffbcf27dcbb93c4ff3f457c54e585cf4a-dirty minikube.k8s.io/name=p1 minikube.k8s.io/updated_at=2020_02_20T12_05_35_0700 --all --overwrite --kubeconfig=/var/lib/minikube/kubeconfig
cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg),
"label", "nodes", verLbl, commitLbl, nameLbl, createdAtLbl, "--all", "--overwrite",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")))
......@@ -826,8 +825,7 @@ func (k *Bootstrapper) elevateKubeSystemPrivileges(cfg config.ClusterConfig) err
defer cancel()
rbacName := "minikube-rbac"
// kubectl create clusterrolebinding minikube-rbac --clusterrole=cluster-admin --serviceaccount=kube-system:default
cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg),
"create", "clusterrolebinding", rbacName, "--clusterrole=cluster-admin", "--serviceaccount=kube-system:default",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")))
rr, err := k.c.RunCmd(cmd)
......
......@@ -26,6 +26,7 @@ import (
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/bootstrapper/kubeadm"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/exit"
)
......@@ -44,12 +45,12 @@ func init() {
// Bootstrapper returns a new bootstrapper for the cluster
// TODO(#6891): Remove node as an argument
func Bootstrapper(api libmachine.API, bootstrapperName string, cc config.ClusterConfig, n config.Node) (bootstrapper.Bootstrapper, error) {
func Bootstrapper(api libmachine.API, bootstrapperName string, cc config.ClusterConfig, r command.Runner) (bootstrapper.Bootstrapper, error) {
var b bootstrapper.Bootstrapper
var err error
switch bootstrapperName {
case bootstrapper.Kubeadm:
b, err = kubeadm.NewBootstrapper(api, cc, n)
b, err = kubeadm.NewBootstrapper(api, cc, r)
if err != nil {
return nil, errors.Wrap(err, "getting a new kubeadm bootstrapper")
}
......
......@@ -25,12 +25,15 @@ import (
"sync"
"time"
"github.com/docker/machine/libmachine/drivers"
"github.com/golang/glog"
"github.com/kballard/go-shellquote"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"golang.org/x/sync/errgroup"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util/retry"
)
var (
......@@ -41,13 +44,53 @@ var (
//
// It implements the CommandRunner interface.
type SSHRunner struct {
d drivers.Driver
c *ssh.Client
}
// NewSSHRunner returns a new SSHRunner that will run commands
// through the ssh.Client provided.
func NewSSHRunner(c *ssh.Client) *SSHRunner {
return &SSHRunner{c}
func NewSSHRunner(d drivers.Driver) *SSHRunner {
return &SSHRunner{d: d, c: nil}
}
// client returns an ssh client (uses retry underneath)
func (s *SSHRunner) client() (*ssh.Client, error) {
if s.c != nil {
return s.c, nil
}
c, err := sshutil.NewSSHClient(s.d)
if err != nil {
return nil, errors.Wrap(err, "new client")
}
s.c = c
return s.c, nil
}
// session returns an ssh session, retrying if necessary
func (s *SSHRunner) session() (*ssh.Session, error) {
var sess *ssh.Session
getSession := func() (err error) {
client, err := s.client()
if err != nil {
return errors.Wrap(err, "new client")
}
sess, err = client.NewSession()
if err != nil {
glog.Warningf("session error, resetting client: %v", err)
s.c = nil
return err
}
return nil
}
if err := retry.Expo(getSession, 250*time.Millisecond, 2*time.Second); err != nil {
return nil, err
}
return sess, nil
}
// Remove runs a command to delete a file on the remote.
......@@ -55,7 +98,7 @@ func (s *SSHRunner) Remove(f assets.CopyableFile) error {
dst := path.Join(f.GetTargetDir(), f.GetTargetName())
glog.Infof("rm: %s", dst)
sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return errors.Wrap(err, "getting ssh session")
}
......@@ -97,6 +140,10 @@ func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {
// RunCmd implements the Command Runner interface to run a exec.Cmd object
func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
if cmd.Stdin != nil {
return nil, fmt.Errorf("SSHRunner does not support stdin - you could be the first to add it")
}
rr := &RunResult{Args: cmd.Args}
glog.Infof("Run: %v", rr.Command())
......@@ -117,7 +164,7 @@ func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
errb = io.MultiWriter(cmd.Stderr, &rr.Stderr)
}
sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return rr, errors.Wrap(err, "NewSession")
}
......@@ -170,10 +217,17 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
glog.Warningf("0 byte asset: %+v", f)
}
sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return errors.Wrap(err, "NewSession")
}
defer func() {
if err := sess.Close(); err != nil {
if err != io.EOF {
glog.Errorf("session close: %v", err)
}
}
}()
w, err := sess.StdinPipe()
if err != nil {
......
......@@ -36,7 +36,7 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/mcnutils"
"github.com/docker/machine/libmachine/persist"
"github.com/docker/machine/libmachine/ssh"
lmssh "github.com/docker/machine/libmachine/ssh"
"github.com/docker/machine/libmachine/state"
"github.com/docker/machine/libmachine/swarm"
"github.com/docker/machine/libmachine/version"
......@@ -48,13 +48,12 @@ import (
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/minikube/sshutil"
)
// NewRPCClient gets a new client.
func NewRPCClient(storePath, certsDir string) libmachine.API {
c := libmachine.NewClient(storePath, certsDir)
c.SSHClientType = ssh.Native
c.SSHClientType = lmssh.Native
return c
}
......@@ -154,19 +153,7 @@ func CommandRunner(h *host.Host) (command.Runner, error) {
return command.NewExecRunner(), nil
}
if driver.IsKIC(h.Driver.DriverName()) {
return command.NewKICRunner(h.Name, h.Driver.DriverName()), nil
}
return SSHRunner(h)
}
// SSHRunner returns an SSH runner for the host
func SSHRunner(h *host.Host) (command.Runner, error) {
client, err := sshutil.NewSSHClient(h.Driver)
if err != nil {
return nil, errors.Wrap(err, "getting ssh client for bootstrapper")
}
return command.NewSSHRunner(client), nil
return command.NewSSHRunner(h.Driver), nil
}
// Create creates the host
......
......@@ -32,19 +32,15 @@ import (
"github.com/juju/mutex"
"github.com/pkg/errors"
"github.com/spf13/viper"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/drivers/kic/oci"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/driver"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/minikube/vmpath"
"k8s.io/minikube/pkg/util/lock"
"k8s.io/minikube/pkg/util/retry"
)
var (
......@@ -198,7 +194,7 @@ func postStartSetup(h *host.Host, mc config.ClusterConfig) error {
glog.Infof("creating required directories: %v", requiredDirectories)
r, err := commandRunner(h)
r, err := CommandRunner(h)
if err != nil {
return errors.Wrap(err, "command runner")
}
......@@ -217,40 +213,6 @@ func postStartSetup(h *host.Host, mc config.ClusterConfig) error {
return syncLocalAssets(r)
}
// commandRunner returns best available command runner for this host
func commandRunner(h *host.Host) (command.Runner, error) {
d := h.Driver.DriverName()
glog.V(1).Infof("determining appropriate runner for %q", d)
if driver.IsMock(d) {
glog.Infof("returning FakeCommandRunner for %q driver", d)
return &command.FakeCommandRunner{}, nil
}
if driver.BareMetal(h.Driver.DriverName()) {
glog.Infof("returning ExecRunner for %q driver", d)
return command.NewExecRunner(), nil
}
if driver.IsKIC(d) {
glog.Infof("Returning KICRunner for %q driver", d)
return command.NewKICRunner(h.Name, d), nil
}
glog.Infof("Creating SSH client and returning SSHRunner for %q driver", d)
// Retry in order to survive an ssh restart, which sometimes happens due to provisioning
var sc *ssh.Client
getSSH := func() (err error) {
sc, err = sshutil.NewSSHClient(h.Driver)
return err
}
if err := retry.Expo(getSSH, 250*time.Millisecond, 2*time.Second); err != nil {
return nil, err
}
return command.NewSSHRunner(sc), nil
}
// acquireMachinesLock protects against code that is not parallel-safe (libmachine, cert setup)
func acquireMachinesLock(name string) (mutex.Releaser, error) {
spec := lock.PathMutexSpec(filepath.Join(localpath.MiniPath(), "machines"))
......
......@@ -89,18 +89,6 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
cr := configureRuntimes(starter.Runner, *starter.Cfg, sv)
showVersionInfo(starter.Node.KubernetesVersion, cr)
// ssh should be set up by now
// switch to using ssh runner since it is faster
if driver.IsKIC(starter.Cfg.Driver) {
sshRunner, err := machine.SSHRunner(starter.Host)
if err != nil {
glog.Infof("error getting ssh runner: %v", err)
} else {
glog.Infof("Using ssh runner for kic...")
starter.Runner = sshRunner
}
}
var bs bootstrapper.Bootstrapper
var kcs *kubeconfig.Settings
if apiServer {
......@@ -111,7 +99,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
}
// setup kubeadm (must come after setupKubeconfig)
bs = setupKubeAdm(starter.MachineAPI, *starter.Cfg, *starter.Node)
bs = setupKubeAdm(starter.MachineAPI, *starter.Cfg, *starter.Node, starter.Runner)
err = bs.StartCluster(*starter.Cfg)
if err != nil {
......@@ -124,7 +112,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
return nil, errors.Wrap(err, "Failed to update kubeconfig file.")
}
} else {
bs, err = cluster.Bootstrapper(starter.MachineAPI, viper.GetString(cmdcfg.Bootstrapper), *starter.Cfg, *starter.Node)
bs, err = cluster.Bootstrapper(starter.MachineAPI, viper.GetString(cmdcfg.Bootstrapper), *starter.Cfg, starter.Runner)
if err != nil {
return nil, errors.Wrap(err, "Failed to get bootstrapper")
}
......@@ -168,11 +156,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
return nil, errors.Wrap(err, "Updating node")
}
cp, err := config.PrimaryControlPlane(starter.Cfg)
if err != nil {
return nil, errors.Wrap(err, "Getting primary control plane")
}
cpBs, err := cluster.Bootstrapper(starter.MachineAPI, viper.GetString(cmdcfg.Bootstrapper), *starter.Cfg, cp)
cpBs, err := cluster.Bootstrapper(starter.MachineAPI, viper.GetString(cmdcfg.Bootstrapper), *starter.Cfg, starter.Runner)
if err != nil {
return nil, errors.Wrap(err, "Getting bootstrapper")
}
......@@ -268,8 +252,8 @@ func configureRuntimes(runner cruntime.CommandRunner, cc config.ClusterConfig, k
}
// setupKubeAdm adds any requested files into the VM before Kubernetes is started
func setupKubeAdm(mAPI libmachine.API, cfg config.ClusterConfig, n config.Node) bootstrapper.Bootstrapper {
bs, err := cluster.Bootstrapper(mAPI, viper.GetString(cmdcfg.Bootstrapper), cfg, n)
func setupKubeAdm(mAPI libmachine.API, cfg config.ClusterConfig, n config.Node, r command.Runner) bootstrapper.Bootstrapper {
bs, err := cluster.Bootstrapper(mAPI, viper.GetString(cmdcfg.Bootstrapper), cfg, r)
if err != nil {
exit.WithError("Failed to get bootstrapper", err)
}
......
......@@ -19,11 +19,14 @@ package sshutil
import (
"net"
"strconv"
"time"
"github.com/docker/machine/libmachine/drivers"
machinessh "github.com/docker/machine/libmachine/ssh"
"github.com/golang/glog"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/util/retry"
)
// NewSSHClient returns an SSH client object for running commands.
......@@ -37,15 +40,27 @@ func NewSSHClient(d drivers.Driver) (*ssh.Client, error) {
if h.SSHKeyPath != "" {
auth.Keys = []string{h.SSHKeyPath}
}
glog.Infof("new ssh client: %+v", h)
config, err := machinessh.NewNativeConfig(h.Username, auth)
if err != nil {
return nil, errors.Wrapf(err, "Error creating new native config from ssh using: %s, %s", h.Username, auth)
}
client, err := ssh.Dial("tcp", net.JoinHostPort(h.IP, strconv.Itoa(h.Port)), &config)
if err != nil {
return nil, errors.Wrap(err, "Error dialing tcp via ssh client")
var client *ssh.Client
getSSH := func() (err error) {
client, err = ssh.Dial("tcp", net.JoinHostPort(h.IP, strconv.Itoa(h.Port)), &config)
if err != nil {
glog.Warningf("dial failure (will retry): %v", err)
}
return err
}
if err := retry.Expo(getSSH, 250*time.Millisecond, 2*time.Second); err != nil {
return nil, err
}
return client, nil
}
......
......@@ -39,7 +39,6 @@ import (
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/sshutil"
)
// generic interface for minikube provisioner
......@@ -165,11 +164,7 @@ func copyRemoteCerts(authOptions auth.Options, driver drivers.Driver) error {
authOptions.ServerKeyPath: authOptions.ServerKeyRemotePath,
}
sshClient, err := sshutil.NewSSHClient(driver)
if err != nil {
return errors.Wrap(err, "provisioning: error getting ssh client")
}
sshRunner := command.NewSSHRunner(sshClient)
sshRunner := command.NewSSHRunner(driver)
dirs := []string{}
for _, dst := range remoteCerts {
......@@ -177,7 +172,7 @@ func copyRemoteCerts(authOptions auth.Options, driver drivers.Driver) error {
}
args := append([]string{"mkdir", "-p"}, dirs...)
if _, err = sshRunner.RunCmd(exec.Command("sudo", args...)); err != nil {
if _, err := sshRunner.RunCmd(exec.Command("sudo", args...)); err != nil {
return err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册