未验证 提交 5a5f869c 编写于 作者: T Thomas Strömberg 提交者: GitHub

Merge pull request #7124 from tstromberg/less-kubeadm

Skip kubeadm if cluster is running & properly configured
......@@ -50,6 +50,11 @@ func TransferBinaries(cfg config.KubernetesConfig, c command.Runner) error {
return err
}
// stop kubelet to avoid "Text File Busy" error
if _, err := c.RunCmd(exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}
var g errgroup.Group
for _, name := range constants.KubernetesReleaseBinaries {
name := name
......
......@@ -40,9 +40,9 @@ const (
// ConfigFileAssets returns configuration file assets
func ConfigFileAssets(cfg config.KubernetesConfig, kubeadm []byte, kubelet []byte, kubeletSvc []byte, defaultCNIConfig []byte) []assets.CopyableFile {
fs := []assets.CopyableFile{
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath, "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile, "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile, "0644"),
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath+".new", "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile+".new", "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile+".new", "0644"),
}
// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
......
......@@ -30,6 +30,7 @@ import (
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
......@@ -78,6 +79,68 @@ func apiServerPID(cr command.Runner) (int, error) {
return strconv.Atoi(s)
}
// ExpectedComponentsRunning returns whether or not all expected components are running
func ExpectedComponentsRunning(cs *kubernetes.Clientset) error {
expected := []string{
"kube-dns", // coredns
"etcd",
"kube-apiserver",
"kube-controller-manager",
"kube-proxy",
"kube-scheduler",
}
found := map[string]bool{}
pods, err := cs.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if err != nil {
return err
}
for _, pod := range pods.Items {
glog.Infof("found pod: %s", podStatusMsg(pod))
if pod.Status.Phase != core.PodRunning {
continue
}
for k, v := range pod.ObjectMeta.Labels {
if k == "component" || k == "k8s-app" {
found[v] = true
}
}
}
missing := []string{}
for _, e := range expected {
if !found[e] {
missing = append(missing, e)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing components: %v", strings.Join(missing, ", "))
}
return nil
}
// podStatusMsg returns a human-readable pod status, for generating debug status
func podStatusMsg(pod core.Pod) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%q [%s] %s", pod.ObjectMeta.GetName(), pod.ObjectMeta.GetUID(), pod.Status.Phase))
for i, c := range pod.Status.Conditions {
if c.Reason != "" {
if i == 0 {
sb.WriteString(": ")
} else {
sb.WriteString(" / ")
}
sb.WriteString(fmt.Sprintf("%s:%s", c.Type, c.Reason))
}
if c.Message != "" {
sb.WriteString(fmt.Sprintf(" (%s)", c.Message))
}
}
return sb.String()
}
// WaitForSystemPods verifies essential pods for running kurnetes is running
func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr command.Runner, client *kubernetes.Clientset, start time.Time, timeout time.Duration) error {
glog.Info("waiting for kube-system pods to appear ...")
......@@ -99,6 +162,10 @@ func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr comm
return false, nil
}
glog.Infof("%d kube-system pods found", len(pods.Items))
for _, pod := range pods.Items {
glog.Infof(podStatusMsg(pod))
}
if len(pods.Items) < 2 {
return false, nil
}
......@@ -159,7 +226,7 @@ func APIServerStatus(cr command.Runner, ip net.IP, port int) (state.State, error
pid, err := apiServerPID(cr)
if err != nil {
glog.Warningf("unable to get apiserver pid: %v", err)
glog.Warningf("stopped: unable to get apiserver pid: %v", err)
return state.Stopped, nil
}
......@@ -205,6 +272,7 @@ func apiServerHealthz(ip net.IP, port int) (state.State, error) {
resp, err := client.Get(url)
// Connection refused, usually.
if err != nil {
glog.Infof("stopped: %s: %v", url, err)
return state.Stopped, nil
}
if resp.StatusCode == http.StatusUnauthorized {
......
......@@ -209,7 +209,8 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
}
c = exec.Command("/bin/bash", "-c", fmt.Sprintf("%s init --config %s %s --ignore-preflight-errors=%s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), bsutil.KubeadmYamlPath, extraFlags, strings.Join(ignore, ",")))
conf := bsutil.KubeadmYamlPath
c = exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && %s init --config %s %s --ignore-preflight-errors=%s", conf, conf, bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), conf, extraFlags, strings.Join(ignore, ",")))
rr, err := k.c.RunCmd(c)
if err != nil {
return errors.Wrapf(err, "init failed. output: %q", rr.Output())
......@@ -236,6 +237,20 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
return nil
}
func (k *Bootstrapper) controlPlaneEndpoint(cfg config.ClusterConfig) (string, int, error) {
cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return "", 0, err
}
if driver.IsKIC(cfg.Driver) {
ip := oci.DefaultBindIPV4
port, err := oci.ForwardedPort(cfg.Driver, cfg.Name, cp.Port)
return ip, port, err
}
return cp.IP, cp.Port, nil
}
// client sets and returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error) {
if k.k8sClient != nil {
......@@ -262,34 +277,28 @@ func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error
// WaitForNode blocks until the node appears to be healthy
func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, timeout time.Duration) error {
start := time.Now()
out.T(out.Waiting, "Waiting for cluster to come online ...")
if !n.ControlPlane {
glog.Infof("%s is not a control plane, nothing to wait for", n.Name)
return nil
}
cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
}
if n.ControlPlane {
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}
ip := n.IP
port := n.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
p, err := oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, n), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, cfg.Name)
}
port = p
ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return err
}
if n.ControlPlane {
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}
c, err := k.client(ip, port)
......@@ -303,6 +312,31 @@ func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, time
return nil
}
// needsReset returns whether or not the cluster needs to be reconfigured
func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kubernetes.Clientset) bool {
if _, err := k.c.RunCmd(exec.Command("sudo", "diff", "-u", conf, conf+".new")); err != nil {
glog.Infof("needs reset: configs differ")
return true
}
st, err := kverify.APIServerStatus(k.c, net.ParseIP(ip), port)
if err != nil {
glog.Infof("needs reset: apiserver error: %v", err)
return true
}
if st != state.Running {
glog.Infof("needs reset: apiserver in state %s", st)
return true
}
if err := kverify.ExpectedComponentsRunning(client); err != nil {
glog.Infof("needs reset: %v", err)
return true
}
return false
}
// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Infof("restartCluster start")
......@@ -328,14 +362,36 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Errorf("failed to create compat symlinks: %v", err)
}
ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return errors.Wrap(err, "control plane")
}
client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
// If the cluster is running, check if we have any work to do.
conf := bsutil.KubeadmYamlPath
if !k.needsReset(conf, ip, port, client) {
glog.Infof("Taking a shortcut, as the cluster seems to be properly configured")
return nil
}
if _, err := k.c.RunCmd(exec.Command("sudo", "mv", conf+".new", conf)); err != nil {
return errors.Wrap(err, "mv")
}
baseCmd := fmt.Sprintf("%s %s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), phase)
cmds := []string{
fmt.Sprintf("%s phase certs all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, conf),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, conf),
}
glog.Infof("resetting cluster from %s", conf)
// Run commands one at a time so that it is easier to root cause failures.
for _, c := range cmds {
rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", c))
......@@ -346,7 +402,7 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
return errors.Wrap(err, "runtime")
}
// We must ensure that the apiserver is healthy before proceeding
......@@ -354,30 +410,11 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
return errors.Wrap(err, "apiserver healthz")
}
cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return errors.Wrap(err, "getting control plane")
}
ip := cp.IP
port := cp.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
port, err = oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, cp), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, driver.MachineName(cfg, cp))
}
}
client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
if err := kverify.WaitForSystemPods(cr, k, k.c, client, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "system pods")
}
// Explicitly re-enable kubeadm addons (proxy, coredns) so that they will check for IP or configuration changes.
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, bsutil.KubeadmYamlPath))); err != nil {
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, conf))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
}
......@@ -496,11 +533,6 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
glog.Infof("kubelet %s config:\n%+v", kubeletCfg, cfg.KubernetesConfig)
// stop kubelet to avoid "Text File Busy" error
if err := stopKubelet(k.c); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}
if err := bsutil.TransferBinaries(cfg.KubernetesConfig, k.c); err != nil {
return errors.Wrap(err, "downloading binaries")
}
......@@ -509,25 +541,19 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
if cfg.KubernetesConfig.EnableDefaultCNI {
cniFile = []byte(defaultCNIConfig)
}
// Install assets into temporary files
files := bsutil.ConfigFileAssets(cfg.KubernetesConfig, kubeadmCfg, kubeletCfg, kubeletService, cniFile)
if err := copyFiles(k.c, files); err != nil {
return err
}
if err := startKubelet(k.c); err != nil {
if err := reloadKubelet(k.c); err != nil {
return err
}
return nil
}
func stopKubelet(runner command.Runner) error {
stopCmd := exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")
if rr, err := runner.RunCmd(stopCmd); err != nil {
return errors.Wrapf(err, "command: %q output: %q", rr.Command(), rr.Output())
}
return nil
}
func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
// Combine mkdir request into a single call to reduce load
dirs := []string{}
......@@ -547,8 +573,17 @@ func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
return nil
}
func startKubelet(runner command.Runner) error {
startCmd := exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl start kubelet")
func reloadKubelet(runner command.Runner) error {
svc := bsutil.KubeletServiceFile
conf := bsutil.KubeletSystemdConfFile
checkCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("pgrep kubelet && diff -u %s %s.new && diff -u %s %s.new", svc, svc, conf, conf))
if _, err := runner.RunCmd(checkCmd); err == nil {
glog.Infof("kubelet is already running with the right configs")
return nil
}
startCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && sudo mv %s.new %s && sudo systemctl daemon-reload && sudo systemctl restart kubelet", svc, svc, conf, conf))
if _, err := runner.RunCmd(startCmd); err != nil {
return errors.Wrap(err, "starting kubelet")
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册