提交 3a0d03f9 编写于 作者: M Matt Rickard 提交者: GitHub

Merge pull request #1844 from r2d4/command-runner

Add CommandRunner interface
......@@ -1190,26 +1190,32 @@
},
{
"ImportPath": "github.com/golang/protobuf/jsonpb",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
"ImportPath": "github.com/golang/protobuf/proto",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
"ImportPath": "github.com/golang/protobuf/ptypes",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
"ImportPath": "github.com/golang/protobuf/ptypes/any",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
"ImportPath": "github.com/golang/protobuf/ptypes/duration",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
"ImportPath": "github.com/golang/protobuf/ptypes/timestamp",
"Comment": "list-38-g4bd1920",
"Rev": "4bd1920723d7b7c925de087aa32e2187708897f7"
},
{
......@@ -2493,6 +2499,10 @@
"ImportPath": "golang.org/x/oauth2/jwt",
"Rev": "a6bd8cefa1811bd24b86f8902872e4e8225f74c4"
},
{
"ImportPath": "golang.org/x/sync/syncmap",
"Rev": "f52d1811a62927559de87708c8913c1650ce4f26"
},
{
"ImportPath": "golang.org/x/sys/unix",
"Rev": "8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9"
......
......@@ -16,47 +16,10 @@ limitations under the License.
package config
import (
"io/ioutil"
"testing"
"github.com/docker/machine/libmachine/drivers"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/minikube/tests"
)
import "testing"
func TestDisableUnknownAddon(t *testing.T) {
if err := Set("InvalidAddon", "false"); err == nil {
t.Fatalf("Disable did not return error for unknown addon")
}
}
func TestDeleteAddonSSH(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
dashboard := assets.Addons["dashboard"]
if err := deleteAddonSSH(dashboard, d); err != nil {
t.Fatalf("Unexpected error %s deleting addon", err)
}
// check command(s) were run
for _, addon := range dashboard.Assets {
expected, _ := ioutil.ReadFile(addon.GetAssetName())
if _, ok := s.Commands[sshutil.GetDeleteFileCommand(addon)]; !ok {
t.Fatalf("Error: Expected delete addon ssh command to be run: %s.", expected)
}
}
}
......@@ -16,49 +16,10 @@ limitations under the License.
package config
import (
"bytes"
"io/ioutil"
"testing"
"github.com/docker/machine/libmachine/drivers"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/tests"
)
import "testing"
func TestEnableUnknownAddon(t *testing.T) {
if err := Set("InvalidAddon", "false"); err == nil {
t.Fatalf("Enable did not return error for unknown addon")
}
}
func TestTransferAddonSSH(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
dashboard := assets.Addons["dashboard"]
if err := transferAddonSSH(dashboard, d); err != nil {
t.Fatalf("Unexpected error %s transferring addon", err)
}
// check contents
for _, addon := range dashboard.Assets {
expected, _ := ioutil.ReadFile(addon.GetAssetName())
transferred := s.Transfers.Bytes()
//test that custom addons are transferred properly
if !bytes.Contains(transferred, expected) {
t.Fatalf("Expected transfers to contain addon with content: %s. It was: %s", expected, transferred)
}
}
}
......@@ -19,16 +19,13 @@ package config
import (
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/docker/machine/libmachine/drivers"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/minikube/storageclass"
)
......@@ -102,60 +99,26 @@ func EnableOrDisableAddon(name string, val string) error {
return err
}
host, err := cluster.CheckIfApiExistsAndLoad(api)
if enable {
if err = transferAddon(addon, host.Driver); err != nil {
return errors.Wrapf(err, "Error transferring addon %s to VM", name)
}
} else {
if err = deleteAddon(addon, host.Driver); err != nil {
return errors.Wrapf(err, "Error deleting addon %s from VM", name)
}
}
return nil
}
func deleteAddonSSH(addon *assets.Addon, d drivers.Driver) error {
client, err := sshutil.NewSSHClient(d)
if err != nil {
return err
return errors.Wrap(err, "getting host")
}
if err := sshutil.DeleteAddon(addon, client); err != nil {
return err
cmd, err := machine.GetCommandRunner(host)
if err != nil {
return errors.Wrap(err, "getting command runner")
}
return nil
}
func deleteAddon(addon *assets.Addon, d drivers.Driver) error {
if d.DriverName() == "none" {
if err := deleteAddonLocal(addon, d); err != nil {
return err
if enable {
for _, addon := range addon.Assets {
if err := cmd.Copy(addon); err != nil {
return errors.Wrapf(err, "error enabling addon %s: %s", addon.AssetName)
}
}
} else {
if err := deleteAddonSSH(addon, d); err != nil {
return err
}
}
return nil
}
func deleteAddonLocal(addon *assets.Addon, d drivers.Driver) error {
var err error
for _, f := range addon.Assets {
if err = os.Remove(filepath.Join(f.GetTargetDir(), f.GetTargetName())); err != nil {
return err
for _, addon := range addon.Assets {
if err := cmd.Remove(addon); err != nil {
return errors.Wrapf(err, "error disabling addon %s: %s", addon.AssetName)
}
}
}
return err
}
func transferAddonSSH(addon *assets.Addon, d drivers.Driver) error {
client, err := sshutil.NewSSHClient(d)
if err != nil {
return err
}
if err := sshutil.TransferAddon(addon, client); err != nil {
return err
}
return nil
}
......@@ -174,26 +137,3 @@ func EnableOrDisableDefaultStorageClass(name, val string) error {
}
return EnableOrDisableAddon(name, val)
}
func transferAddon(addon *assets.Addon, d drivers.Driver) error {
if d.DriverName() == "none" {
if err := transferAddonLocal(addon, d); err != nil {
return err
}
} else {
if err := transferAddonSSH(addon, d); err != nil {
return err
}
}
return nil
}
func transferAddonLocal(addon *assets.Addon, d drivers.Driver) error {
var err error
for _, f := range addon.Assets {
if err = assets.CopyFileLocal(f); err != nil {
return err
}
}
return err
}
......@@ -21,9 +21,11 @@ import (
"log"
"os"
"github.com/golang/glog"
"github.com/spf13/cobra"
cmdUtil "k8s.io/minikube/cmd/util"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/machine"
)
......@@ -43,7 +45,15 @@ var logsCmd = &cobra.Command{
os.Exit(1)
}
defer api.Close()
s, err := cluster.GetHostLogs(api, follow)
h, err := api.Load(config.GetMachineName())
if err != nil {
glog.Errorln("Error getting host")
}
cmdRunner, err := machine.GetCommandRunner(h)
if err != nil {
glog.Errorln("Error getting command runner interface")
}
s, err := cluster.GetHostLogs(cmdRunner, follow)
if err != nil {
log.Println("Error getting machine logs:", err)
cmdUtil.MaybeReportErrorAndExit(err)
......
......@@ -192,21 +192,26 @@ func runStart(cmd *cobra.Command, args []string) {
glog.Errorln("Error saving profile cluster configuration: ", err)
}
cmdRunner, err := machine.GetCommandRunner(host)
if err != nil {
glog.Errorln("Error getting command runner interface")
}
fmt.Println("Moving files into cluster...")
if err := cluster.UpdateCluster(host.Driver, kubernetesConfig); err != nil {
if err := cluster.UpdateCluster(cmdRunner, kubernetesConfig); err != nil {
glog.Errorln("Error updating cluster: ", err)
cmdUtil.MaybeReportErrorAndExit(err)
}
fmt.Println("Setting up certs...")
if err := cluster.SetupCerts(host.Driver, kubernetesConfig.APIServerName, kubernetesConfig.DNSDomain); err != nil {
if err := cluster.SetupCerts(cmdRunner, kubernetesConfig); err != nil {
glog.Errorln("Error configuring authentication: ", err)
cmdUtil.MaybeReportErrorAndExit(err)
}
fmt.Println("Starting cluster components...")
if err := cluster.StartCluster(api, kubernetesConfig); err != nil {
if err := cluster.StartCluster(cmdRunner, kubernetesConfig); err != nil {
glog.Errorln("Error starting cluster: ", err)
cmdUtil.MaybeReportErrorAndExit(err)
}
......
......@@ -62,7 +62,16 @@ var statusCmd = &cobra.Command{
cs := state.None.String()
ks := state.None.String()
if ms == state.Running.String() {
cs, err = cluster.GetLocalkubeStatus(api)
h, err := api.Load(config.GetMachineName())
if err != nil {
glog.Exitln("Error getting host")
}
cmdRunner, err := machine.GetCommandRunner(h)
if err != nil {
glog.Errorln("Error getting command runner interface")
cmdUtil.MaybeReportErrorAndExit(err)
}
cs, err = cluster.GetLocalkubeStatus(cmdRunner)
if err != nil {
glog.Errorln("Error localkube status:", err)
cmdUtil.MaybeReportErrorAndExit(err)
......
......@@ -20,8 +20,6 @@ import (
"bytes"
"io"
"os"
"path/filepath"
"strconv"
"github.com/pkg/errors"
)
......@@ -164,34 +162,3 @@ func (m *BinDataAsset) GetLength() int {
func (m *BinDataAsset) Read(p []byte) (int, error) {
return m.reader.Read(p)
}
func CopyFileLocal(f CopyableFile) error {
if err := os.MkdirAll(f.GetTargetDir(), os.ModePerm); err != nil {
return errors.Wrapf(err, "error making dirs for %s", f.GetTargetDir())
}
targetPath := filepath.Join(f.GetTargetDir(), f.GetTargetName())
if _, err := os.Stat(targetPath); err == nil {
if err := os.Remove(targetPath); err != nil {
return errors.Wrapf(err, "error removing file %s", targetPath)
}
}
target, err := os.Create(targetPath)
if err != nil {
return errors.Wrapf(err, "error creating file at %s", targetPath)
}
perms, err := strconv.Atoi(f.GetPermissions())
if err != nil {
return errors.Wrapf(err, "error converting permissions %s to integer", perms)
}
if err := target.Chmod(os.FileMode(perms)); err != nil {
return errors.Wrapf(err, "error changing file permissions for %s", targetPath)
}
if _, err = io.Copy(target, f); err != nil {
return errors.Wrapf(err, `error copying file %s to target location:
do you have the correct permissions? The none driver requires sudo for the "start" command`,
targetPath)
}
return target.Close()
}
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrapper
import (
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
)
// ExecRunner runs commands using the os/exec package.
//
// It implements the CommandRunner interface.
type ExecRunner struct{}
// Run starts the specified command in a bash shell and waits for it to complete.
func (*ExecRunner) Run(cmd string) error {
glog.Infoln("Run:", cmd)
c := exec.Command("/bin/bash", "-c", cmd)
if err := c.Run(); err != nil {
return errors.Wrapf(err, "running command: %s", cmd)
}
return nil
}
// CombinedOutput runs the command in a bash shell and returns its
// combined standard output and standard error.
func (*ExecRunner) CombinedOutput(cmd string) (string, error) {
glog.Infoln("Run with output:", cmd)
c := exec.Command("/bin/bash", "-c", cmd)
out, err := c.CombinedOutput()
if err != nil {
return "", errors.Wrapf(err, "running command: %s\n output: %s", cmd, out)
}
return string(out), nil
}
// Copy copies a file and its permissions
func (*ExecRunner) Copy(f assets.CopyableFile) error {
if err := os.MkdirAll(f.GetTargetDir(), os.ModePerm); err != nil {
return errors.Wrapf(err, "error making dirs for %s", f.GetTargetDir())
}
targetPath := filepath.Join(f.GetTargetDir(), f.GetTargetName())
if _, err := os.Stat(targetPath); err == nil {
if err := os.Remove(targetPath); err != nil {
return errors.Wrapf(err, "error removing file %s", targetPath)
}
}
target, err := os.Create(targetPath)
if err != nil {
return errors.Wrapf(err, "error creating file at %s", targetPath)
}
perms, err := strconv.Atoi(f.GetPermissions())
if err != nil {
return errors.Wrapf(err, "error converting permissions %s to integer", perms)
}
if err := target.Chmod(os.FileMode(perms)); err != nil {
return errors.Wrapf(err, "error changing file permissions for %s", targetPath)
}
if _, err = io.Copy(target, f); err != nil {
return errors.Wrapf(err, `error copying file %s to target location:
do you have the correct permissions?`,
targetPath)
}
return target.Close()
}
// Remove removes a file
func (e *ExecRunner) Remove(f assets.CopyableFile) error {
cmd := getDeleteFileCommand(f)
return e.Run(cmd)
}
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrapper
import (
"bytes"
"fmt"
"io"
"golang.org/x/sync/syncmap"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
)
// FakeCommandRunner mocks command output without running the Commands
//
// It implements the CommandRunner interface and is used for testing.
type FakeCommandRunner struct {
cmdMap syncmap.Map
fileMap syncmap.Map
}
// NewFakeCommandRunner returns a new FakeCommandRunner
//
// The expected output of commands should be set with SetCommandToOutput
func NewFakeCommandRunner() *FakeCommandRunner {
return &FakeCommandRunner{}
}
// Run returns nil if output has been set for the given command text.
func (f *FakeCommandRunner) Run(cmd string) error {
_, err := f.CombinedOutput(cmd)
return err
}
// CombinedOutput returns the set output for a given command text.
func (f *FakeCommandRunner) CombinedOutput(cmd string) (string, error) {
out, ok := f.cmdMap.Load(cmd)
if !ok {
return "", fmt.Errorf("unavailable command: %s", cmd)
}
return out.(string), nil
}
// Copy adds the filename, file contents key value pair to the stored map.
func (f *FakeCommandRunner) Copy(file assets.CopyableFile) error {
var b bytes.Buffer
_, err := io.Copy(&b, file)
if err != nil {
return errors.Wrapf(err, "error reading file: %+v", file)
}
f.fileMap.Store(file.GetAssetName(), b.String())
return nil
}
// Remove removes the filename, file contents key value pair from the stored map
func (f *FakeCommandRunner) Remove(file assets.CopyableFile) error {
f.fileMap.Delete(file.GetAssetName())
return nil
}
// SetFileToContents stores the file to contents map for the FakeCommandRunner
func (f *FakeCommandRunner) SetFileToContents(fileToContents map[string]string) {
for k, v := range fileToContents {
f.fileMap.Store(k, v)
}
}
// SetCommandToOutput stores the file to contents map for the FakeCommandRunner
func (f *FakeCommandRunner) SetCommandToOutput(cmdToOutput map[string]string) {
for k, v := range cmdToOutput {
f.cmdMap.Store(k, v)
}
}
// SetFileToContents stores the file to contents map for the FakeCommandRunner
func (f *FakeCommandRunner) GetFileToContents(filename string) (string, error) {
contents, ok := f.fileMap.Load(filename)
if !ok {
return "", fmt.Errorf("unavailable file: %s", filename)
}
return contents.(string), nil
}
// DumpMaps prints out the list of stored commands and stored filenames.
func (f *FakeCommandRunner) DumpMaps(w io.Writer) {
fmt.Fprintln(w, "Commands:")
f.cmdMap.Range(func(k, v interface{}) bool {
fmt.Fprintf(w, "%s:%s", k, v)
return true
})
fmt.Fprintln(w, "Filenames: ")
f.fileMap.Range(func(k, v interface{}) bool {
fmt.Fprint(w, k)
return true
})
}
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrapper
import (
"fmt"
"path/filepath"
"k8s.io/minikube/pkg/minikube/assets"
)
// CommandRunner represents an interface to run commands.
type CommandRunner interface {
// Run starts the specified command and waits for it to complete.
Run(cmd string) error
// CombinedOutput runs the command and returns its combined standard
// output and standard error.
CombinedOutput(cmd string) (string, error)
// Copy is a convenience method that runs a command to copy a file
Copy(assets.CopyableFile) error
//Remove is a convenience method that runs a command to remove a file
Remove(assets.CopyableFile) error
}
func getDeleteFileCommand(f assets.CopyableFile) string {
return fmt.Sprintf("sudo rm %s", filepath.Join(f.GetTargetDir(), f.GetTargetName()))
}
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrapper
import (
"fmt"
"io"
"path/filepath"
"sync"
"github.com/golang/glog"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/minikube/assets"
)
// SSHRunner runs commands through SSH.
//
// It implements the CommandRunner interface.
type SSHRunner struct {
c *ssh.Client
}
// NewSSHRunner returns a new SSHRunner that will run commands
// through the ssh.Client provided.
func NewSSHRunner(c *ssh.Client) *SSHRunner {
return &SSHRunner{c}
}
// Remove runs a command to delete a file on the remote.
func (s *SSHRunner) Remove(f assets.CopyableFile) error {
sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "getting ssh session")
}
defer sess.Close()
cmd := getDeleteFileCommand(f)
return sess.Run(cmd)
}
// Run starts a command on the remote and waits for it to return.
func (s *SSHRunner) Run(cmd string) error {
glog.Infoln("Run:", cmd)
sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "getting ssh session")
}
defer sess.Close()
return sess.Run(cmd)
}
// CombinedOutput runs the command on the remote and returns its combined
// standard output and standard error.
func (s *SSHRunner) CombinedOutput(cmd string) (string, error) {
glog.Infoln("Run with output:", cmd)
sess, err := s.c.NewSession()
if err != nil {
return "", errors.Wrap(err, "getting ssh session")
}
defer sess.Close()
out, err := sess.CombinedOutput(cmd)
if err != nil {
return "", errors.Wrapf(err, "running command: %s\n output: %s", cmd, out)
}
return string(out), nil
}
// Copy copies a file to the remote over SSH.
func (s *SSHRunner) Copy(f assets.CopyableFile) error {
deleteCmd := fmt.Sprintf("sudo rm -f %s", filepath.Join(f.GetTargetDir(), f.GetTargetName()))
mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", f.GetTargetDir())
for _, cmd := range []string{deleteCmd, mkdirCmd} {
if err := s.Run(cmd); err != nil {
return errors.Wrapf(err, "Error running command: %s", cmd)
}
}
sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "Error creating new session via ssh client")
}
w, err := sess.StdinPipe()
if err != nil {
return errors.Wrap(err, "Error accessing StdinPipe via ssh session")
}
// The scpcmd below *should not* return until all data is copied and the
// StdinPipe is closed. But let's use a WaitGroup to make it expicit.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer w.Close()
header := fmt.Sprintf("C%s %d %s\n", f.GetPermissions(), f.GetLength(), f.GetTargetName())
fmt.Fprint(w, header)
io.Copy(w, f)
fmt.Fprint(w, "\x00")
}()
scpcmd := fmt.Sprintf("sudo scp -t %s", f.GetTargetDir())
if err := sess.Run(scpcmd); err != nil {
return errors.Wrapf(err, "Error running scp command: %s", scpcmd)
}
wg.Wait()
return nil
}
......@@ -17,7 +17,6 @@ limitations under the License.
package cluster
import (
"bytes"
"encoding/json"
"flag"
"fmt"
......@@ -44,9 +43,9 @@ import (
"k8s.io/client-go/tools/clientcmd/api/latest"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
cfg "k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/kubeconfig"
)
......@@ -156,12 +155,8 @@ func GetHostStatus(api libmachine.API) (string, error) {
}
// GetLocalkubeStatus gets the status of localkube from the host VM.
func GetLocalkubeStatus(api libmachine.API) (string, error) {
h, err := CheckIfApiExistsAndLoad(api)
if err != nil {
return "", err
}
s, err := RunCommand(h, localkubeStatusCommand, false)
func GetLocalkubeStatus(cmd bootstrapper.CommandRunner) (string, error) {
s, err := cmd.CombinedOutput(localkubeStatusCommand)
if err != nil {
return "", err
}
......@@ -194,26 +189,18 @@ func GetHostDriverIP(api libmachine.API) (net.IP, error) {
}
// StartCluster starts a k8s cluster on the specified Host.
func StartCluster(api libmachine.API, kubernetesConfig KubernetesConfig) error {
h, err := CheckIfApiExistsAndLoad(api)
if err != nil {
return errors.Wrap(err, "Error checking that api exists and loading it")
}
func StartCluster(cmd bootstrapper.CommandRunner, kubernetesConfig KubernetesConfig) error {
startCommand, err := GetStartCommand(kubernetesConfig)
if err != nil {
return errors.Wrapf(err, "Error generating start command: %s", err)
}
glog.Infoln(startCommand)
output, err := RunCommand(h, startCommand, true)
glog.Infoln(output)
if err != nil {
return errors.Wrapf(err, "Error running ssh command: %s", startCommand)
if err := cmd.Run(startCommand); err != nil {
return errors.Wrapf(err, "Error running start command: %s", startCommand)
}
return nil
}
func UpdateCluster(d drivers.Driver, config KubernetesConfig) error {
func UpdateCluster(cmd bootstrapper.CommandRunner, config KubernetesConfig) error {
copyableFiles := []assets.CopyableFile{}
var localkubeFile assets.CopyableFile
var err error
......@@ -244,24 +231,9 @@ func UpdateCluster(d drivers.Driver, config KubernetesConfig) error {
}
}
if d.DriverName() == "none" {
// transfer files to correct place on filesystem
for _, f := range copyableFiles {
if err := assets.CopyFileLocal(f); err != nil {
return err
}
}
return nil
}
// transfer files to vm via SSH
client, err := sshutil.NewSSHClient(d)
if err != nil {
return errors.Wrap(err, "Error creating new ssh client")
}
for _, f := range copyableFiles {
if err := sshutil.TransferFile(f, client); err != nil {
// fmt.Println(f.GetAssetName())
if err := cmd.Copy(f); err != nil {
return err
}
}
......@@ -274,20 +246,16 @@ func localkubeURIWasSpecified(config KubernetesConfig) bool {
}
// SetupCerts gets the generated credentials required to talk to the APIServer.
func SetupCerts(d drivers.Driver, apiServerName string, clusterDnsDomain string) error {
func SetupCerts(cmd bootstrapper.CommandRunner, k8s KubernetesConfig) error {
localPath := constants.GetMinipath()
ipStr, err := d.GetIP()
if err != nil {
return errors.Wrap(err, "Error getting ip from driver")
}
glog.Infoln("Setting up certificates for IP: %s", ipStr)
ip := net.ParseIP(k8s.NodeIP)
glog.Infoln("Setting up certificates for IP: %s", ip)
ip := net.ParseIP(ipStr)
caCert := filepath.Join(localPath, "ca.crt")
caKey := filepath.Join(localPath, "ca.key")
publicPath := filepath.Join(localPath, "apiserver.crt")
privatePath := filepath.Join(localPath, "apiserver.key")
if err := GenerateCerts(caCert, caKey, publicPath, privatePath, ip, apiServerName, clusterDnsDomain); err != nil {
if err := GenerateCerts(caCert, caKey, publicPath, privatePath, ip, k8s.APIServerName, k8s.DNSDomain); err != nil {
return errors.Wrap(err, "Error generating certs")
}
......@@ -318,37 +286,24 @@ func SetupCerts(d drivers.Driver, apiServerName string, clusterDnsDomain string)
kubeCfg := api.NewConfig()
kubeconfig.PopulateKubeConfig(kubeCfgSetup, kubeCfg)
data, err := runtime.Encode(latest.Codec, kubeCfg)
if err != nil {
return errors.Wrap(err, "setup certs: encoding kubeconfig")
}
kubeCfgFile := assets.NewMemoryAsset(data,
util.DefaultLocalkubeDirectory, "kubeconfig", "0644")
copyableFiles = append(copyableFiles, kubeCfgFile)
if d.DriverName() == "none" {
// transfer files to correct place on filesystem
for _, f := range copyableFiles {
if err := assets.CopyFileLocal(f); err != nil {
return err
}
}
return nil
}
// transfer files to vm via SSH
client, err := sshutil.NewSSHClient(d)
if err != nil {
return errors.Wrap(err, "Error creating new ssh client")
}
for _, f := range copyableFiles {
if err := sshutil.TransferFile(f, client); err != nil {
if err := cmd.Copy(f); err != nil {
return err
}
}
return nil
}
func engineOptions(config MachineConfig) *engine.Options {
o := engine.Options{
Env: config.DockerEnv,
InsecureRegistry: config.InsecureRegistry,
......@@ -447,32 +402,16 @@ func GetHostDockerEnv(api libmachine.API) (map[string]string, error) {
// GetHostLogs gets the localkube logs of the host VM.
// If follow is specified, it will tail the logs
func GetHostLogs(api libmachine.API, follow bool) (string, error) {
h, err := CheckIfApiExistsAndLoad(api)
if err != nil {
return "", errors.Wrap(err, "Error checking that api exists and loading it")
}
func GetHostLogs(cmd bootstrapper.CommandRunner, follow bool) (string, error) {
logsCommand, err := GetLogsCommand(follow)
if err != nil {
return "", errors.Wrap(err, "Error getting logs command")
}
if follow {
c, err := h.CreateSSHClient()
if err != nil {
return "", errors.Wrap(err, "Error creating ssh client")
}
err = c.Shell(logsCommand)
if err != nil {
return "", errors.Wrap(err, "error ssh shell")
}
return "", err
}
s, err := RunCommand(h, logsCommand, false)
logs, err := cmd.CombinedOutput(logsCommand)
if err != nil {
return s, err
return "", errors.Wrap(err, "running logs command")
}
return s, nil
return logs, nil
}
// MountHost runs the mount command from the 9p client on the VM to the 9p server on the host
......@@ -597,28 +536,3 @@ func EnsureMinikubeRunningOrExit(api libmachine.API, exitStatus int) {
os.Exit(exitStatus)
}
}
// RunCommand executes commands for both the local and driver implementations
func RunCommand(h *host.Host, command string, sudo bool) (string, error) {
if h.Driver.DriverName() == "none" {
cmd := exec.Command("/bin/bash", "-c", command)
if sudo {
cmd = exec.Command("sudo", "/bin/bash", "-c", command)
}
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return "", errors.Wrap(err, stderr.String())
}
return out.String(), err
}
out, err := h.RunSSHCommand(command)
if err != nil {
return "", errors.Wrap(err, string(out))
}
return string(out), err
}
......@@ -17,11 +17,6 @@ limitations under the License.
package cluster
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
......@@ -31,7 +26,7 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/provision"
"github.com/docker/machine/libmachine/state"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/tests"
......@@ -87,73 +82,220 @@ func TestCreateHost(t *testing.T) {
}
func TestStartCluster(t *testing.T) {
api := tests.NewMockAPI()
s, _ := tests.NewSSHServer()
port, err := s.Start()
expectedStartCmd, err := GetStartCommand(KubernetesConfig{})
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
t.Fatalf("generating start command: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
cases := []struct {
description string
startCmd string
}{
{
description: "start cluster success",
startCmd: expectedStartCmd,
},
{
description: "start cluster failure",
startCmd: "something else",
},
CurrentState: state.Running,
}
api.Hosts[config.GetMachineName()] = &host.Host{Driver: d}
kubernetesConfig := KubernetesConfig{
NodeIP: "",
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
f := bootstrapper.NewFakeCommandRunner()
f.SetCommandToOutput(map[string]string{test.startCmd: "ok"})
err := StartCluster(f, KubernetesConfig{})
if err != nil && test.startCmd == expectedStartCmd {
t.Errorf("Error starting cluster: %s", err)
}
})
}
}
err = StartCluster(api, kubernetesConfig)
func TestUpdateCluster(t *testing.T) {
defaultCfg := KubernetesConfig{
KubernetesVersion: constants.DefaultKubernetesVersion,
}
defaultAddons := []string{
"deploy/addons/kube-dns/kube-dns-cm.yaml",
"deploy/addons/kube-dns/kube-dns-svc.yaml",
"deploy/addons/addon-manager.yaml",
"deploy/addons/dashboard/dashboard-rc.yaml",
"deploy/addons/dashboard/dashboard-svc.yaml",
"deploy/addons/storageclass/storageclass.yaml",
"deploy/addons/kube-dns/kube-dns-controller.yaml",
}
cases := []struct {
description string
k8s KubernetesConfig
expectedFiles []string
shouldErr bool
}{
{
description: "transfer localkube correct",
k8s: defaultCfg,
expectedFiles: []string{"out/localkube"},
},
{
description: "addons are transferred",
k8s: defaultCfg,
expectedFiles: defaultAddons,
},
{
description: "no localkube version",
k8s: KubernetesConfig{},
shouldErr: true,
},
}
if err != nil {
t.Fatalf("Error starting cluster: %s", err)
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
f := bootstrapper.NewFakeCommandRunner()
err := UpdateCluster(f, test.k8s)
if err != nil && !test.shouldErr {
t.Errorf("Error updating cluster: %s", err)
return
}
if err == nil && test.shouldErr {
t.Error("Didn't get error, but expected to")
return
}
for _, expectedFile := range test.expectedFiles {
_, err := f.GetFileToContents(expectedFile)
if err != nil {
t.Errorf("Expected file %s, but was not present", expectedFile)
}
}
})
}
}
startCommand, err := GetStartCommand(kubernetesConfig)
if err != nil {
t.Fatalf("Error getting start command: %s", err)
func TestGetLocalkubeStatus(t *testing.T) {
cases := []struct {
description string
statusCmdMap map[string]string
expectedStatus string
shouldErr bool
}{
{
description: "get status running",
statusCmdMap: map[string]string{localkubeStatusCommand: "Running"},
expectedStatus: "Running",
},
{
description: "get status stopped",
statusCmdMap: map[string]string{localkubeStatusCommand: "Stopped"},
expectedStatus: "Stopped",
},
{
description: "get status unknown status",
statusCmdMap: map[string]string{localkubeStatusCommand: "Recalculating..."},
shouldErr: true,
},
{
description: "get status error",
statusCmdMap: map[string]string{"a": "b"},
shouldErr: true,
},
}
for _, cmd := range []string{startCommand} {
if _, ok := s.Commands[cmd]; !ok {
t.Fatalf("Expected command not run: %s. Commands run: %v", cmd, s.Commands)
}
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
f := bootstrapper.NewFakeCommandRunner()
f.SetCommandToOutput(test.statusCmdMap)
actualStatus, err := GetLocalkubeStatus(f)
if err != nil && !test.shouldErr {
t.Errorf("Error getting localkube status: %s", err)
return
}
if err == nil && test.shouldErr {
t.Error("Didn't get error, but expected to")
return
}
if test.expectedStatus != actualStatus {
t.Errorf("Expected status: %s, Actual status: %s", test.expectedStatus, actualStatus)
}
})
}
}
func TestStartClusterError(t *testing.T) {
api := tests.NewMockAPI()
s, _ := tests.NewSSHServer()
port, err := s.Start()
func TestGetHostLogs(t *testing.T) {
logs, err := GetLogsCommand(false)
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
t.Fatalf("Error getting logs command: %s", err)
}
logsf, err := GetLogsCommand(true)
if err != nil {
t.Fatalf("Error gettings logs -f command: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
cases := []struct {
description string
logsCmdMap map[string]string
follow bool
shouldErr bool
}{
{
description: "get logs correct",
logsCmdMap: map[string]string{logs: "fee"},
},
{
description: "follow logs correct",
logsCmdMap: map[string]string{logsf: "fi"},
follow: true,
},
{
description: "get logs incorrect",
logsCmdMap: map[string]string{"fo": "fum"},
shouldErr: true,
},
CurrentState: state.Running,
HostError: true,
}
api.Hosts[config.GetMachineName()] = &host.Host{Driver: d}
kubernetesConfig := KubernetesConfig{
NodeIP: "192",
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
f := bootstrapper.NewFakeCommandRunner()
f.SetCommandToOutput(test.logsCmdMap)
_, err := GetHostLogs(f, test.follow)
if err != nil && !test.shouldErr {
t.Errorf("Error getting localkube logs: %s", err)
return
}
if err == nil && test.shouldErr {
t.Error("Didn't get error, but expected to")
return
}
})
}
}
func TestSetupCerts(t *testing.T) {
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
err = StartCluster(api, kubernetesConfig)
f := bootstrapper.NewFakeCommandRunner()
k8s := KubernetesConfig{
APIServerName: constants.APIServerName,
DNSDomain: constants.ClusterDNSDomain,
}
if err == nil {
t.Fatal("Error not thrown starting cluster.")
var filesToBeTransferred []string
for _, cert := range certs {
filesToBeTransferred = append(filesToBeTransferred, filepath.Join(constants.GetMinipath(), cert))
}
if err := SetupCerts(f, k8s); err != nil {
t.Fatalf("Error starting cluster: %s", err)
}
for _, cert := range filesToBeTransferred {
_, err := f.GetFileToContents(cert)
if err != nil {
t.Errorf("Cert not generated: %s", cert)
}
}
}
......@@ -378,77 +520,6 @@ func TestGetHostStatus(t *testing.T) {
checkState(state.Stopped.String())
}
func TestGetLocalkubeStatus(t *testing.T) {
api := tests.NewMockAPI()
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
api.Hosts[config.GetMachineName()] = &host.Host{Driver: d}
s.SetCommandToOutput(map[string]string{
localkubeStatusCommand: state.Running.String(),
})
if _, err := GetLocalkubeStatus(api); err != nil {
t.Fatalf("Error getting localkube status: %s", err)
}
s.SetCommandToOutput(map[string]string{
localkubeStatusCommand: state.Stopped.String(),
})
if _, err := GetLocalkubeStatus(api); err != nil {
t.Fatalf("Error getting localkube status: %s", err)
}
s.SetCommandToOutput(map[string]string{
localkubeStatusCommand: "Bad Output",
})
if _, err := GetLocalkubeStatus(api); err == nil {
t.Fatalf("Expected error in getting localkube status as ssh returned bad output")
}
}
func TestSetupCerts(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
if err := SetupCerts(d, constants.APIServerName, constants.ClusterDNSDomain); err != nil {
t.Fatalf("Error starting cluster: %s", err)
}
for _, cert := range certs {
contents, _ := ioutil.ReadFile(cert)
transferred := s.Transfers.Bytes()
if !bytes.Contains(transferred, contents) {
t.Fatalf("Certificate not copied. Expected transfers to contain: %s. It was: %s", contents, transferred)
}
}
}
func TestGetHostDockerEnv(t *testing.T) {
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
......@@ -510,54 +581,6 @@ func TestGetHostDockerEnvIPv6(t *testing.T) {
}
}
func TestHostGetLogs(t *testing.T) {
api := tests.NewMockAPI()
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
api.Hosts[config.GetMachineName()] = &host.Host{Driver: d}
tests := []struct {
description string
follow bool
}{
{
description: "logs",
follow: false,
},
{
description: "logs -f",
follow: true,
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
cmd, err := GetLogsCommand(test.follow)
if err != nil {
t.Errorf("Error getting the logs command: %s", err)
}
if _, err = GetHostLogs(api, test.follow); err != nil {
t.Errorf("Error getting host logs: %s", err)
}
if _, ok := s.Commands[cmd]; !ok {
t.Errorf("Expected command to run but did not: %s", cmd)
}
})
}
}
func TestCreateSSHShell(t *testing.T) {
api := tests.NewMockAPI()
......@@ -587,93 +610,6 @@ func TestCreateSSHShell(t *testing.T) {
}
}
func TestUpdateDefault(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
kubernetesConfig := KubernetesConfig{
KubernetesVersion: constants.DefaultKubernetesVersion,
}
if err := UpdateCluster(d, kubernetesConfig); err != nil {
t.Fatalf("Error updating cluster: %s", err)
}
transferred := s.Transfers.Bytes()
for _, addonBundle := range assets.Addons {
if isEnabled, err := addonBundle.IsEnabled(); err == nil && isEnabled {
for _, addon := range addonBundle.Assets {
contents, _ := assets.Asset(addon.GetAssetName())
if !bytes.Contains(transferred, contents) {
t.Fatalf("File not copied. Expected transfers to contain: %s. It was: %s", contents, transferred)
}
}
} else if err != nil {
t.Fatalf("File not copied. Unexpected error while attempting to check transferred addons: %s", err)
}
}
//test that localkube is transferred properly
contents, _ := assets.Asset("out/localkube")
if !bytes.Contains(transferred, contents) {
t.Fatalf("File not copied. Expected transfers to contain: %s. It was: %s", contents, transferred)
}
}
var testLocalkubeBin = "hello"
type K8sVersionHandlerCorrect struct{}
func (h *K8sVersionHandlerCorrect) ServeHTTP(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, testLocalkubeBin)
}
func TestUpdateKubernetesVersion(t *testing.T) {
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
handler := &K8sVersionHandlerCorrect{}
server := httptest.NewServer(handler)
kubernetesConfig := KubernetesConfig{
KubernetesVersion: server.URL,
}
if err := UpdateCluster(d, kubernetesConfig); err != nil {
t.Fatalf("Error updating cluster: %s", err)
}
transferred := s.Transfers.Bytes()
//test that localkube is transferred properly
contents := []byte(testLocalkubeBin)
if !bytes.Contains(transferred, contents) {
t.Fatalf("File not copied. Expected transfers to contain: %s. It was: %s", contents, transferred)
}
}
func TestIsLocalkubeCached(t *testing.T) {
tempDir := tests.MakeTempDir()
defer os.RemoveAll(tempDir)
......@@ -716,56 +652,3 @@ func TestIsLocalkubeCached(t *testing.T) {
inner(input)
}
}
func TestUpdateCustomAddons(t *testing.T) {
tempDir := tests.MakeTempDir()
os.Mkdir(constants.MakeMiniPath("addons", "subdir"), 0777)
defer os.RemoveAll(tempDir)
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
//write a file into ~/.minikube/addons
path := filepath.Join(constants.MakeMiniPath("addons"), "dir-addon.yaml")
testContent1 := []byte("CUSTOM ADDON TEST STRING#1, In Addons Dir")
err = ioutil.WriteFile(path, testContent1, 0644)
if err != nil {
t.Fatalf("Error writing custom addon: %s", err)
}
path = filepath.Join(constants.MakeMiniPath("addons", "subdir"), "subdir-addon.yaml")
testContent2 := []byte("CUSTOM ADDON TEST STRING#2, In Addons SubDir")
err = ioutil.WriteFile(path, testContent2, 0644)
if err != nil {
t.Fatalf("Error writing custom addon: %s", err)
}
//run update
kubernetesConfig := KubernetesConfig{
KubernetesVersion: constants.DefaultKubernetesVersion,
}
if err := UpdateCluster(d, kubernetesConfig); err != nil {
t.Fatalf("Error updating cluster: %s", err)
}
transferred := s.Transfers.Bytes()
//test that custom addons are transferred properly
if !bytes.Contains(transferred, testContent1) {
t.Fatalf("Custom addon not copied. Expected transfers to contain custom addon with content: %s. It was: %s", testContent1, transferred)
}
if !bytes.Contains(transferred, testContent2) {
t.Fatalf("Custom addon not copied. Expected transfers to contain custom addon with content: %s. It was: %s", testContent2, transferred)
}
}
......@@ -143,3 +143,4 @@ const (
)
const IsMinikubeChildProcess = "IS_MINIKUBE_CHILD_PROCESS"
const DriverNone = "none"
......@@ -25,7 +25,9 @@ import (
"path/filepath"
"time"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/provision"
"github.com/docker/machine/drivers/virtualbox"
......@@ -157,6 +159,18 @@ func (api *LocalClient) Load(name string) (*host.Host, error) {
return h, nil
}
func GetCommandRunner(h *host.Host) (bootstrapper.CommandRunner, error) {
if h.DriverName != constants.DriverNone {
client, err := sshutil.NewSSHClient(h.Driver)
if err != nil {
return nil, errors.Wrap(err, "getting ssh client for bootstrapper")
}
return bootstrapper.NewSSHRunner(client), nil
}
return &bootstrapper.ExecRunner{}, nil
}
func (api *LocalClient) Close() error {
if api.legacyClient != nil {
return api.legacyClient.Close()
......
......@@ -17,29 +17,15 @@ limitations under the License.
package sshutil
import (
"fmt"
"io"
"net"
"path/filepath"
"strconv"
"sync"
"github.com/docker/machine/libmachine/drivers"
machinessh "github.com/docker/machine/libmachine/ssh"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/util"
)
// SSHSession provides methods for running commands on a host.
type SSHSession interface {
Close() error
StdinPipe() (io.WriteCloser, error)
Run(cmd string) error
Wait() error
}
// NewSSHClient returns an SSH client object for running commands.
func NewSSHClient(d drivers.Driver) (*ssh.Client, error) {
h, err := newSSHHost(d)
......@@ -63,84 +49,6 @@ func NewSSHClient(d drivers.Driver) (*ssh.Client, error) {
return client, nil
}
func DeleteAddon(a *assets.Addon, client *ssh.Client) error {
m := util.MultiError{}
for _, f := range a.Assets {
if err := DeleteFile(f, client); err != nil {
m.Collect(err)
}
}
return m.ToError()
}
func TransferAddon(a *assets.Addon, client *ssh.Client) error {
m := util.MultiError{}
for _, f := range a.Assets {
if err := TransferFile(f, client); err != nil {
m.Collect(err)
}
}
return m.ToError()
}
func TransferFile(f assets.CopyableFile, client *ssh.Client) error {
return Transfer(f, f.GetLength(),
f.GetTargetDir(), f.GetTargetName(),
f.GetPermissions(), client)
}
// Transfer uses an SSH session to copy a file to the remote machine.
func Transfer(reader io.Reader, readerLen int, remotedir, filename string, perm string, c *ssh.Client) error {
// Delete the old file first. This makes sure permissions get reset.
deleteCmd := fmt.Sprintf("sudo rm -f %s", filepath.Join(remotedir, filename))
mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", remotedir)
for _, cmd := range []string{deleteCmd, mkdirCmd} {
if err := RunCommand(c, cmd); err != nil {
return errors.Wrapf(err, "Error running command: %s", cmd)
}
}
s, err := c.NewSession()
if err != nil {
return errors.Wrap(err, "Error creating new session via ssh client")
}
w, err := s.StdinPipe()
if err != nil {
return errors.Wrap(err, "Error accessing StdinPipe via ssh session")
}
// The scpcmd below *should not* return until all data is copied and the
// StdinPipe is closed. But let's use a WaitGroup to make it expicit.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer w.Close()
header := fmt.Sprintf("C%s %d %s\n", perm, readerLen, filename)
fmt.Fprint(w, header)
io.Copy(w, reader)
fmt.Fprint(w, "\x00")
}()
scpcmd := fmt.Sprintf("sudo scp -t %s", remotedir)
if err := s.Run(scpcmd); err != nil {
return errors.Wrap(err, "Error running scp command")
}
wg.Wait()
return nil
}
func RunCommand(c *ssh.Client, cmd string) error {
s, err := c.NewSession()
defer s.Close()
if err != nil {
return errors.Wrap(err, "Error creating new session for ssh client")
}
return s.Run(cmd)
}
type sshHost struct {
IP string
Port int
......@@ -165,11 +73,3 @@ func newSSHHost(d drivers.Driver) (*sshHost, error) {
Username: d.GetSSHUsername(),
}, nil
}
func DeleteFile(f assets.CopyableFile, client *ssh.Client) error {
return RunCommand(client, GetDeleteFileCommand(f))
}
func GetDeleteFileCommand(f assets.CopyableFile) string {
return fmt.Sprintf("sudo rm %s", filepath.Join(f.GetTargetDir(), f.GetTargetName()))
}
......@@ -17,7 +17,6 @@ limitations under the License.
package sshutil
import (
"bytes"
"testing"
"github.com/docker/machine/libmachine/drivers"
......@@ -44,7 +43,15 @@ func TestNewSSHClient(t *testing.T) {
}
cmd := "foo"
RunCommand(c, cmd)
sess, err := c.NewSession()
defer sess.Close()
if err != nil {
t.Fatal("Error creating new session for ssh client")
}
if err := sess.Run(cmd); err != nil {
t.Fatalf("Error running command: %s", cmd)
}
if !s.Connected {
t.Fatalf("Error!")
}
......@@ -90,28 +97,3 @@ func TestNewSSHHostError(t *testing.T) {
t.Fatal("Expected error creating host, got nil")
}
}
func TestTransfer(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
if err != nil {
t.Fatalf("Error starting ssh server: %s", err)
}
d := &tests.MockDriver{
Port: port,
BaseDriver: drivers.BaseDriver{
IPAddress: "127.0.0.1",
SSHKeyPath: "",
},
}
c, err := NewSSHClient(d)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
dest := "bar"
contents := []byte("testcontents")
if err := Transfer(bytes.NewReader(contents), len(contents), "/tmp", dest, "0777", c); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
......@@ -36,6 +36,7 @@ import (
"github.com/docker/machine/libmachine/swarm"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util"
)
......@@ -190,6 +191,7 @@ func configureAuth(p *BuildrootProvisioner) error {
return errors.Wrap(err, "error getting ip during provisioning")
}
execRunner := &bootstrapper.ExecRunner{}
hostCerts := map[string]string{
authOptions.CaCertPath: filepath.Join(authOptions.StorePath, "ca.pem"),
authOptions.ClientCertPath: filepath.Join(authOptions.StorePath, "cert.pem"),
......@@ -201,7 +203,7 @@ func configureAuth(p *BuildrootProvisioner) error {
if err != nil {
return errors.Wrapf(err, "open cert file: %s", src)
}
if err := assets.CopyFileLocal(f); err != nil {
if err := execRunner.Copy(f); err != nil {
return errors.Wrapf(err, "transferring file: %+v", f)
}
}
......@@ -240,13 +242,13 @@ func configureAuth(p *BuildrootProvisioner) error {
if err != nil {
return errors.Wrap(err, "provisioning: error getting ssh client")
}
sshRunner := bootstrapper.NewSSHRunner(sshClient)
for src, dst := range remoteCerts {
f, err := assets.NewFileAsset(src, filepath.Dir(dst), filepath.Base(dst), "0640")
if err != nil {
return errors.Wrapf(err, "error copying %s to %s", src, dst)
}
if err := sshutil.TransferFile(f, sshClient); err != nil {
if err := sshRunner.Copy(f); err != nil {
return errors.Wrapf(err, "transfering file to machine %v", f)
}
}
......
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package syncmap provides a concurrent map implementation.
// It is a prototype for a proposed addition to the sync package
// in the standard library.
// (https://golang.org/issue/18177)
package syncmap
import (
"sync"
"sync/atomic"
"unsafe"
)
// Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
// It is safe for multiple goroutines to call a Map's methods concurrently.
//
// The zero Map is valid and empty.
//
// A Map must not be copied after first use.
type Map struct {
mu sync.Mutex
// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
read atomic.Value // readOnly
// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[interface{}]*entry
// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
// p points to the interface{} value stored for the entry.
//
// If p == nil, the entry has been deleted and m.dirty == nil.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p unsafe.Pointer // *interface{}
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// storeLocked unconditionally stores a value to the entry.
//
// The entry must be known not to be expunged.
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
// tryLoadOrStore atomically loads or stores a value if the entry is not
// expunged.
//
// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and
// returns with ok==false.
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// Copy the interface after the first load to make this method more amenable
// to escape analysis: if we hit the "load" path or the entry is expunged, we
// shouldn't bother heap-allocating.
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
//
// Range does not necessarily correspond to any consistent snapshot of the Map's
// contents: no key will be visited more than once, but if the value for any key
// is stored or deleted concurrently, Range may reflect any mapping for that key
// from any point during the Range call.
//
// Range may be O(N) with the number of elements in the map even if f returns
// false after a constant number of calls.
func (m *Map) Range(f func(key, value interface{}) bool) {
// We need to be able to iterate over all of the keys that were already
// present at the start of the call to Range.
// If read.amended is false, then read.m satisfies that property without
// requiring us to hold m.mu for a long time.
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
// (assuming the caller does not break out early), so a call to Range
// amortizes an entire copy of the map: we can promote the dirty copy
// immediately!
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册