提交 43f5851e 编写于 作者: D Dan Lorenc

Fix race condition in cluster setup.

We now explicitly generate certs before starting the cluster.
上级 2cee5ca3
......@@ -22,6 +22,7 @@ import (
flag "github.com/spf13/pflag"
"k8s.io/minikube/pkg/localkube"
"k8s.io/minikube/pkg/util"
)
func NewLocalkubeServer() *localkube.LocalkubeServer {
......@@ -31,14 +32,15 @@ func NewLocalkubeServer() *localkube.LocalkubeServer {
return &localkube.LocalkubeServer{
Containerized: false,
EnableDNS: true,
DNSDomain: "cluster.local",
DNSDomain: util.DNSDomain,
DNSIP: net.ParseIP("10.0.0.10"),
LocalkubeDirectory: "/var/lib/localkube",
LocalkubeDirectory: util.LocalkubeDirectory,
ServiceClusterIPRange: *defaultServiceClusterIPRange,
APIServerAddress: net.ParseIP("0.0.0.0"),
APIServerPort: 443,
APIServerInsecureAddress: net.ParseIP("127.0.0.1"),
APIServerInsecurePort: 8080,
ShouldGenerateCerts: true,
}
}
......@@ -54,6 +56,7 @@ func AddFlags(s *localkube.LocalkubeServer) {
flag.IntVar(&s.APIServerPort, "apiserver-port", s.APIServerPort, "The port the apiserver will listen securely on")
flag.IPVar(&s.APIServerInsecureAddress, "apiserver-insecure-address", s.APIServerInsecureAddress, "The address the apiserver will listen insecurely on")
flag.IntVar(&s.APIServerInsecurePort, "apiserver-insecure-port", s.APIServerInsecurePort, "The port the apiserver will listen insecurely on")
flag.BoolVar(&s.ShouldGenerateCerts, "generate-certs", s.ShouldGenerateCerts, "If localkube should generate it's own certificates")
// These two come from vendor/ packages that use flags. We should hide them
flag.CommandLine.MarkHidden("google-json-key")
......
......@@ -55,12 +55,12 @@ func init() {
}
func SetupServer(s *localkube.LocalkubeServer) {
if err := s.GenerateCerts(); err != nil {
fmt.Println("Failed to create certificates!")
panic(err)
if s.ShouldGenerateCerts {
if err := s.GenerateCerts(); err != nil {
fmt.Println("Failed to create certificates!")
panic(err)
}
}
// setup etcd
etcd, err := s.NewEtcd(localkube.KubeEtcdClientURLs, localkube.KubeEtcdPeerURLs, "kubeetcd", s.GetEtcdDataDirectory())
if err != nil {
......
......@@ -64,6 +64,11 @@ func runStart(cmd *cobra.Command, args []string) {
os.Exit(1)
}
if err := cluster.SetupCerts(host.Driver); err != nil {
log.Println("Error configuring authentication: ", err)
os.Exit(1)
}
if err := cluster.StartCluster(host); err != nil {
log.Println("Error starting cluster: ", err)
os.Exit(1)
......@@ -89,11 +94,6 @@ func runStart(cmd *cobra.Command, args []string) {
fmt.Println("Run this command to use the cluster: ")
fmt.Printf("kubectl config use-context %s\n", name)
}
if err := cluster.GetCreds(host); err != nil {
log.Println("Error configuring authentication: ", err)
os.Exit(1)
}
}
// setupKubeconfig reads config from disk, adds the minikube settings, and writes it back.
......
......@@ -47,6 +47,7 @@ type LocalkubeServer struct {
APIServerPort int
APIServerInsecureAddress net.IP
APIServerInsecurePort int
ShouldGenerateCerts bool
}
func (lk *LocalkubeServer) AddServer(server Server) {
......@@ -153,9 +154,8 @@ func (lk LocalkubeServer) GenerateCerts() error {
return nil
}
fmt.Println("Creating cert with IPs: ", ips)
alternateDNS := []string{fmt.Sprintf("%s.%s", "kubernetes.default.svc", lk.DNSDomain), "kubernetes.default.svc", "kubernetes.default", "kubernetes"}
if err := GenerateSelfSignedCert(lk.GetPublicKeyCertPath(), lk.GetPrivateKeyCertPath(), ips, alternateDNS); err != nil {
if err := util.GenerateSelfSignedCert(lk.GetPublicKeyCertPath(), lk.GetPrivateKeyCertPath(), ips, util.GetAlternateDNS(lk.DNSDomain)); err != nil {
fmt.Println("Failed to create certs: ", err)
return err
}
......
......@@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"path/filepath"
"strings"
"time"
......@@ -32,10 +33,7 @@ import (
"github.com/docker/machine/libmachine/state"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/sshutil"
)
const (
remotePath = "/var/lib/localkube/certs"
"k8s.io/minikube/pkg/util"
)
var (
......@@ -150,10 +148,14 @@ type MachineConfig struct {
// StartCluster starts a k8s cluster on the specified Host.
func StartCluster(h sshAble) error {
output, err := h.RunSSHCommand(getStartCommand())
log.Println(output)
if err != nil {
return err
commands := []string{startCommand}
for _, cmd := range commands {
output, err := h.RunSSHCommand(cmd)
log.Println(output)
if err != nil {
return err
}
}
return nil
......@@ -177,18 +179,33 @@ func UpdateCluster(d drivers.Driver) error {
return nil
}
// GetCreds gets the generated credentials required to talk to the APIServer.
func GetCreds(h sshAble) error {
// SetupCerts gets the generated credentials required to talk to the APIServer.
func SetupCerts(d drivers.Driver) error {
localPath := constants.Minipath
ipStr, err := d.GetIP()
if err != nil {
return err
}
ip := net.ParseIP(ipStr)
publicPath := filepath.Join(localPath, "apiserver.crt")
privatePath := filepath.Join(localPath, "apiserver.key")
if err := GenerateCerts(publicPath, privatePath, ip); err != nil {
return err
}
client, err := sshutil.NewSSHClient(d)
if err != nil {
return err
}
for _, cert := range certs {
remoteCertPath := filepath.Join(remotePath, cert)
localCertPath := filepath.Join(localPath, cert)
data, err := h.RunSSHCommand(fmt.Sprintf("sudo cat %s", remoteCertPath))
p := filepath.Join(localPath, cert)
data, err := ioutil.ReadFile(p)
if err != nil {
return err
}
if err := ioutil.WriteFile(localCertPath, []byte(data), 0644); err != nil {
if err := sshutil.Transfer(data, util.CertPath, cert, "0644", client); err != nil {
return err
}
}
......
......@@ -17,14 +17,14 @@ limitations under the License.
package cluster
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/docker/machine/libmachine/drivers"
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/state"
"k8s.io/minikube/pkg/minikube/constants"
......@@ -67,9 +67,18 @@ func TestCreateHost(t *testing.T) {
type mockHost struct {
CommandOutput map[string]string
Error string
Commands map[string]int
}
func newMockHost() *mockHost {
return &mockHost{
CommandOutput: make(map[string]string),
Commands: make(map[string]int),
}
}
func (m mockHost) RunSSHCommand(cmd string) (string, error) {
m.Commands[cmd] = 1
output, ok := m.CommandOutput[cmd]
if ok {
return output, nil
......@@ -81,15 +90,23 @@ func (m mockHost) RunSSHCommand(cmd string) (string, error) {
}
func TestStartCluster(t *testing.T) {
h := mockHost{}
h := newMockHost()
err := StartCluster(h)
if err != nil {
t.Fatalf("Error starting cluster: %s", err)
}
for _, cmd := range []string{startCommand} {
if _, ok := h.Commands[cmd]; !ok {
t.Fatalf("Expected command not run: %s. Commands run: %s", cmd, h.Commands)
}
}
}
func TestStartClusterError(t *testing.T) {
h := mockHost{Error: "error"}
h := newMockHost()
h.Error = "error"
err := StartCluster(h)
if err == nil {
t.Fatal("Error not thrown starting cluster.")
......@@ -284,52 +301,33 @@ func TestGetHostStatus(t *testing.T) {
checkState(state.Stopped.String())
}
func TestGetCreds(t *testing.T) {
m := make(map[string]string)
for _, cert := range certs {
m[fmt.Sprintf("sudo cat %s/%s", remotePath, cert)] = cert
}
h := mockHost{CommandOutput: m}
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
if err := GetCreds(h); err != nil {
t.Fatalf("Error starting cluster: %s", err)
func TestSetupCerts(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
for _, cert := range certs {
// Files should be created with contents matching the output.
certPath := filepath.Join(tempDir, cert)
contents, err := ioutil.ReadFile(certPath)
if err != nil {
t.Fatalf("Error %s reading file: %s", err, certPath)
}
if !reflect.DeepEqual(contents, []byte(cert)) {
t.Fatalf("Contents of file are: %s, should be %s", contents, cert)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
}
func TestGetCredsError(t *testing.T) {
h := mockHost{
Error: "error getting creds",
}
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
if err := GetCreds(h); err == nil {
t.Fatalf("Error should have been thrown, but was not.")
if err := SetupCerts(d); err != nil {
t.Fatalf("Error starting cluster: %s", err)
}
// No files should have been created.
for _, cert := range certs {
certPath := filepath.Join(tempDir, cert)
_, err := os.Stat(certPath)
if !os.IsNotExist(err) {
t.Fatalf("File %s should not exist.", certPath)
contents, _ := ioutil.ReadFile(cert)
transferred := s.Transfers.Bytes()
if !bytes.Contains(transferred, contents) {
t.Fatalf("Certificate not copied. Expected transfers to contain %s. It was: %s", contents, transferred)
}
}
}
......@@ -2,9 +2,5 @@ package cluster
var startCommand = `
# Run with nohup so it stays up. Redirect logs to useful places.
PATH=/usr/local/sbin:$PATH nohup sudo /usr/local/bin/localkube start > /var/log/localkube.out 2> /var/log/localkube.err < /dev/null &
PATH=/usr/local/sbin:$PATH nohup sudo /usr/local/bin/localkube start --generate-certs=false > /var/log/localkube.out 2> /var/log/localkube.err < /dev/null &
`
func getStartCommand() string {
return startCommand
}
package cluster
import (
"net"
"k8s.io/minikube/pkg/util"
)
func GenerateCerts(pub, priv string, ip net.IP) error {
ips := []net.IP{ip}
if err := util.GenerateSelfSignedCert(pub, priv, ips, util.GetAlternateDNS(util.DNSDomain)); err != nil {
return err
}
return nil
}
......@@ -62,8 +62,11 @@ func NewSSHClient(d drivers.Driver) (*ssh.Client, error) {
func Transfer(data []byte, remotedir, filename string, perm string, c *ssh.Client) error {
// Delete the old file first. This makes sure permissions get reset.
deleteCmd := fmt.Sprintf("sudo rm -f %s", filepath.Join(remotedir, filename))
if err := RunCommand(c, deleteCmd); err != nil {
return err
mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", remotedir)
for _, cmd := range []string{deleteCmd, mkdirCmd} {
if err := RunCommand(c, cmd); err != nil {
return err
}
}
s, err := c.NewSession()
......
package tests
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"io"
"io/ioutil"
"net"
"strconv"
......@@ -17,11 +17,13 @@ type SSHServer struct {
// Commands stores the raw commands executed against the server.
Commands []string
Connected bool
Transfers *bytes.Buffer
}
// NewSSHServer returns a NewSSHServer instance, ready for use.
func NewSSHServer() (*SSHServer, error) {
s := &SSHServer{}
s.Transfers = &bytes.Buffer{}
s.Config = &ssh.ServerConfig{
NoClientAuth: true,
}
......@@ -73,8 +75,8 @@ func (s *SSHServer) Start() (int, error) {
s.Commands = append(s.Commands, string(req.Payload))
channel.SendRequest("exit-status", false, []byte{0, 0, 0, 0})
// Discard anything that comes in over stdin.
io.Copy(ioutil.Discard, channel)
// Store anything that comes in over stdin.
io.Copy(s.Transfers, channel)
channel.Close()
}
}
......
package util
import "fmt"
const (
LocalkubeDirectory = "/var/lib/localkube"
DNSDomain = "cluster.local"
CertPath = "/var/lib/localkube/certs/"
)
func GetAlternateDNS(domain string) []string {
return []string{fmt.Sprintf("%s.%s", "kubernetes.default.svc", domain), "kubernetes.default.svc", "kubernetes.default", "kubernetes"}
}
......@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localkube
package util
import (
"bytes"
......
......@@ -27,11 +27,11 @@ import (
)
func TestCluster(t *testing.T) {
kubectlRunner := util.NewKubectlRunner(t)
minikubeRunner := util.MinikubeRunner{BinaryPath: *binaryPath, T: t}
minikubeRunner.RunCommand("start", true)
minikubeRunner.CheckStatus("Running")
kubectlRunner := util.NewKubectlRunner(t)
cs := api.ComponentStatusList{}
kubectlRunner.RunCommand([]string{"get", "cs"}, &cs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册