提交 e535268d 编写于 作者: T Thomas Stromberg

Use kubeadm to restart kube-proxy rather than rolling our own

上级 aca2c6e4
......@@ -155,6 +155,7 @@ func init() {
startCmd.Flags().Bool(noVTXCheck, false, "Disable checking for the availability of hardware virtualization before the vm is started (virtualbox)")
if err := viper.BindPFlags(startCmd.Flags()); err != nil {
exit.WithError("unable to bind flags", err)
......@@ -244,9 +245,6 @@ func runStart(cmd *cobra.Command, args []string) {
// The kube config must be update must come before bootstrapping, otherwise health checks may use a stale IP
kubeconfig := updateKubeConfig(host, &config)
bootstrapCluster(bs, cr, runner, config.KubernetesConfig, preexisting, isUpgrade)
apiserverPort := config.KubernetesConfig.NodePort
validateCluster(bs, cr, runner, ip, apiserverPort)
if err = LoadCachedImagesInConfigFile(); err != nil {
console.Failure("Unable to load cached images from config file.")
......@@ -257,6 +255,9 @@ func runStart(cmd *cobra.Command, args []string) {
if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
exit.WithError("Wait failed", err)
......@@ -39,6 +39,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
......@@ -33,6 +33,7 @@ import (
......@@ -214,20 +215,10 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
glog.Infof("Configuring cluster permissions ...")
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
// Make sure elevating privileges didn't screw anything up
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
return nil
......@@ -260,16 +251,13 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
return nil
// waitForPods waits until the important Kubernetes pods are in running state
func waitForPods(k8s config.KubernetesConfig, quiet bool) error {
// WaitCluster blocks until Kubernetes appears to be healthy.
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig) error {
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
// by a CNI plugin which is usually started after minikube has been brought
// up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
componentsOnly := k8s.NetworkPlugin == "cni"
if !quiet {
console.OutStyle("waiting-pods", "Waiting for:")
console.OutStyle("waiting-pods", "Verifying: ")
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "k8s client")
......@@ -280,17 +268,13 @@ func waitForPods(k8s config.KubernetesConfig, quiet bool) error {
if !quiet {
console.Out(" %s", p.name)
console.Out(" %s", p.name)
selector := labels.SelectorFromSet(labels.Set(map[string]string{p.key: p.value}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, fmt.Sprintf("waiting for %s=%s", p.key, p.value))
if !quiet {
return nil
......@@ -308,11 +292,13 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
controlPlane = "control-plane"
configPath := constants.KubeadmConfigFile
baseCmd := fmt.Sprintf("sudo kubeadm %s", phase)
cmds := []string{
fmt.Sprintf("sudo kubeadm %s phase certs all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase kubeconfig all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase %s all --config %s", phase, controlPlane, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase etcd local --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, configPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, configPath),
// Run commands one at a time so that it is easier to root cause failures.
......@@ -322,23 +308,33 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
console.OutStyle("reconfiguring", "Updating kube-proxy configuration ...")
if err = util.RetryAfter(5, func() error { return updateKubeProxyConfigMap(k8s) }, 5*time.Second); err != nil {
return errors.Wrap(err, "restarting kube-proxy")
// Make sure the kube-proxy restart didn't screw anything up.
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
// restart the proxy and coredns
if err := k.c.Run(fmt.Sprintf("%s phase addon all --config %s", baseCmd, configPath)); err != nil {
return errors.Wrapf(err, "addon phase")
return nil
// waitForAPIServer waits for the apiserver to start up
func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
glog.Infof("Waiting for apiserver ...")
defer glog.Infof("Done waiting for apiserver ...")
return wait.PollImmediate(time.Millisecond*200, time.Minute*1, func() (bool, error) {
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
glog.Infof("status: %s, err: %v", status, err)
if err != nil {
return false, err
if status != "Running" {
return false, nil
return true, nil
// DeleteCluster removes the components that were started earlier
func (k *Bootstrapper) DeleteCluster(k8s config.KubernetesConfig) error {
cmd := fmt.Sprintf("sudo kubeadm reset --force")
......@@ -17,11 +17,8 @@ limitations under the License.
package kubeadm
import (
......@@ -29,10 +26,8 @@ import (
rbac "k8s.io/api/rbac/v1beta1"
apierr "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -130,98 +125,3 @@ func elevateKubeSystemPrivileges() error {
return nil
const (
kubeconfigConf = "kubeconfig.conf"
kubeProxyConfigmapTmpl = `apiVersion: v1
kind: Config
- cluster:
certificate-authority: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
server: https://{{.AdvertiseAddress}}:{{.APIServerPort}}
name: default
- context:
cluster: default
namespace: default
user: default
name: default
current-context: default
- name: default
tokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
// updateKubeProxyConfigMap updates the IP & port kube-proxy listens on, and restarts it.
func updateKubeProxyConfigMap(k8s config.KubernetesConfig) error {
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "getting k8s client")
selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-proxy"}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
cfgMap, err := client.CoreV1().ConfigMaps("kube-system").Get("kube-proxy", meta.GetOptions{})
if err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "getting kube-proxy configmap")}
glog.Infof("kube-proxy config: %v", cfgMap.Data[kubeconfigConf])
t := template.Must(template.New("kubeProxyTmpl").Parse(kubeProxyConfigmapTmpl))
opts := struct {
AdvertiseAddress string
APIServerPort int
AdvertiseAddress: k8s.NodeIP,
APIServerPort: k8s.NodePort,
kubeconfig := bytes.Buffer{}
if err := t.Execute(&kubeconfig, opts); err != nil {
return errors.Wrap(err, "executing kube proxy configmap template")
if cfgMap.Data == nil {
cfgMap.Data = map[string]string{}
updated := strings.TrimSuffix(kubeconfig.String(), "\n")
glog.Infof("updated kube-proxy config: %s", updated)
// An optimization, but also one that's unlikely, as kubeadm writes the address as 'localhost'
if cfgMap.Data[kubeconfigConf] == updated {
glog.Infof("kube-proxy config appears to require no change, not restarting kube-proxy")
return nil
cfgMap.Data[kubeconfigConf] = updated
// Make this step retriable, as it can fail with:
// "Operation cannot be fulfilled on configmaps "kube-proxy": the object has been modified; please apply your changes to the latest version and try again"
if _, err := client.CoreV1().ConfigMaps("kube-system").Update(cfgMap); err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "updating configmap")}
pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{
LabelSelector: "k8s-app=kube-proxy",
if err != nil {
return errors.Wrap(err, "listing kube-proxy pods")
for _, pod := range pods.Items {
// Retriable, as known to fail with: pods "<name>" not found
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &meta.DeleteOptions{}); err != nil {
return &util.RetriableError{Err: errors.Wrapf(err, "deleting pod %+v", pod)}
// Wait for the scheduler to restart kube-proxy
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
return nil
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册