提交 9b707fe2 编写于 作者: T Thomas Stromberg

Add TeeWithPrefix to ssh_runner as well

上级 ab5d4334
......@@ -17,6 +17,7 @@ limitations under the License.
package bootstrapper
import (
"bytes"
"fmt"
"io"
"path"
......@@ -26,6 +27,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/util"
)
// SSHRunner runs commands through SSH.
......@@ -52,25 +54,75 @@ func (s *SSHRunner) Remove(f assets.CopyableFile) error {
return sess.Run(cmd)
}
type singleWriter struct {
b bytes.Buffer
mu sync.Mutex
}
func (w *singleWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.b.Write(p)
}
// teeSSH runs an SSH command, streaming stdout, stderr to logs
func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {
outPipe, err := s.StdoutPipe()
if err != nil {
return errors.Wrap(err, "stdout")
}
errPipe, err := s.StderrPipe()
if err != nil {
return errors.Wrap(err, "stderr")
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
if err := util.TeeWithPrefix(util.ErrPrefix, errPipe, errB, glog.Infof); err != nil {
glog.Errorf("tee stderr: %v", err)
}
wg.Done()
}()
go func() {
if err := util.TeeWithPrefix(util.OutPrefix, outPipe, outB, glog.Infof); err != nil {
glog.Errorf("tee stdout: %v", err)
}
wg.Done()
}()
err = s.Run(cmd)
wg.Wait()
return err
}
// Run starts a command on the remote and waits for it to return.
func (s *SSHRunner) Run(cmd string) error {
glog.Infoln("Run:", cmd)
glog.Infof("SSH: %s", cmd)
sess, err := s.c.NewSession()
defer func() {
if err := sess.Close(); err != nil {
if err != io.EOF {
glog.Errorf("close: %v", err)
}
}
}()
if err != nil {
return errors.Wrap(err, "getting ssh session")
}
defer sess.Close()
return sess.Run(cmd)
var outB bytes.Buffer
var errB bytes.Buffer
return teeSSH(sess, cmd, &outB, &errB)
}
// CombinedOutputTo runs the command and stores both command
// output and error to out.
func (s *SSHRunner) CombinedOutputTo(cmd string, out io.Writer) error {
b, err := s.CombinedOutput(cmd)
func (s *SSHRunner) CombinedOutputTo(cmd string, w io.Writer) error {
out, err := s.CombinedOutput(cmd)
if err != nil {
return errors.Wrapf(err, "running command: %s\n.", cmd)
}
_, err = out.Write([]byte(b))
_, err = w.Write([]byte(out))
return err
}
......@@ -84,9 +136,11 @@ func (s *SSHRunner) CombinedOutput(cmd string) (string, error) {
}
defer sess.Close()
b, err := sess.CombinedOutput(cmd)
var combined singleWriter
err = teeSSH(sess, cmd, &combined, &combined)
b := combined.b.Bytes()
if err != nil {
return "", errors.Wrapf(err, "running command: %s\n, output: %s", cmd, string(b))
return "", errors.Wrapf(err, "running command: %s\n, output: %s", cmd, b)
}
return string(b), nil
}
......
......@@ -17,6 +17,8 @@ limitations under the License.
package util
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
......@@ -33,6 +35,9 @@ import (
"github.com/pkg/errors"
)
const ErrPrefix = "! "
const OutPrefix = "> "
const (
downloadURL = "https://storage.googleapis.com/minikube/releases/%s/minikube-%s-amd64%s"
)
......@@ -199,3 +204,32 @@ func MaybeChownDirRecursiveToMinikubeUser(dir string) error {
}
return nil
}
// TeeWithPrefix logs new lines from a reader. Designed to be run in the background.
func TeeWithPrefix(prefix string, r io.Reader, w io.Writer, logger func(format string, args ...interface{})) error {
scanner := bufio.NewScanner(r)
// Collect individual bytes so that we don't accidentally strip newlines required by callers.
scanner.Split(bufio.ScanBytes)
var line bytes.Buffer
for scanner.Scan() {
b := scanner.Bytes()
if _, err := w.Write(b); err != nil {
return err
}
if bytes.IndexAny(b, "\r\n") == 0 {
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
line.Reset()
}
continue
}
line.Write(b)
}
// Catch trailing output in case stream does not end with a newline
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
}
return nil
}
......@@ -17,9 +17,13 @@ limitations under the License.
package util
import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"github.com/pkg/errors"
......@@ -158,3 +162,40 @@ func TestGetBinaryDownloadURL(t *testing.T) {
}
}
func TestTeeWithPrefix(t *testing.T) {
var in bytes.Buffer
var out bytes.Buffer
var logged strings.Builder
logSink := func(format string, args ...interface{}) {
logged.WriteString("(")
logged.WriteString(fmt.Sprintf(format, args...))
logged.WriteString(")")
}
// Simulate the primary use case: tee in the background. This also helps avoid I/O races.
var wg sync.WaitGroup
wg.Add(1)
go func() {
TeeWithPrefix(": ", &in, &out, logSink)
wg.Done()
}()
in.Write([]byte("goo"))
in.Write([]byte("\n"))
in.Write([]byte("gle"))
wg.Wait()
gotBytes := out.Bytes()
wantBytes := []byte("goo\ngle")
if !bytes.Equal(gotBytes, wantBytes) {
t.Errorf("got bytes: %v, want: %v", gotBytes, wantBytes)
}
gotLog := logged.String()
wantLog := "(: goo)(: gle)"
if gotLog != wantLog {
t.Errorf("got log %q, want log %q", gotLog, wantLog)
}
}
......@@ -22,12 +22,12 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"testing"
"time"
......@@ -39,8 +39,6 @@ import (
)
const kubectlBinary = "kubectl"
const errPrefix = " !"
const outPrefix = " >"
type MinikubeRunner struct {
T *testing.T
......@@ -50,11 +48,12 @@ type MinikubeRunner struct {
Runtime string
}
// Logf writes logs to stdout if -v is set.
func Logf(str string, args ...interface{}) {
if !testing.Verbose() {
return
}
fmt.Printf("%s: ", time.Now().Format(time.Stamp))
fmt.Printf(" %s | ", time.Now().Format("15:04:05"))
fmt.Println(fmt.Sprintf(str, args...))
}
......@@ -79,53 +78,37 @@ func (m *MinikubeRunner) Remove(f assets.CopyableFile) error {
return err
}
// tee logs new lines from a reader to stdout. Designed to be callable in the background.
func tee(prefix string, f io.Reader, b strings.Builder) {
start := time.Now()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
t := scanner.Text()
b.WriteString(t + "\n")
if !testing.Verbose() {
continue
}
offset := ""
seconds := time.Since(start).Seconds()
if seconds > 1 {
offset = fmt.Sprintf("+%.fs", seconds)
}
fmt.Printf("%s%6s %s\n", prefix, offset, t)
}
}
// teeRun runs a command, streaming stdout, stderr to console
func teeRun(cmd exec.Cmd) (string, string, error) {
stderr, err := cmd.StderrPipe()
func (m *MinikubeRunner) teeRun(cmd *exec.Cmd) (string, string, error) {
errPipe, err := cmd.StderrPipe()
if err != nil {
m.T.Fatalf("stderr pipe: %v", err)
return "", "", err
}
stdout, err := cmd.StdoutPipe()
outPipe, err := cmd.StdoutPipe()
if err != nil {
m.T.Fatalf("stderr pipe: %v", err)
return "", "", err
}
start := time.Now()
cmd.Start()
var bOut strings.Builder
var bErr strings.Builder
var outB bytes.Buffer
var errB bytes.Buffer
var wg sync.WaitGroup
wg.Add(2)
go func() {
tee(errPrefix, stderr, bErr)
if err := commonutil.TeeWithPrefix(commonutil.ErrPrefix, errPipe, &errB, Logf); err != nil {
m.T.Logf("tee: %v", err)
}
wg.Done()
}
}()
go func() {
tee(outPrefix, stdout, bOut)
if err := commonutil.TeeWithPrefix(commonutil.OutPrefix, outPipe, &outB, Logf); err != nil {
m.T.Logf("tee: %v", err)
}
wg.Done()
}
err := cmd.Wait()
}()
err = cmd.Wait()
wg.Wait()
return bOut.String(), bErr.String(), err
return outB.String(), errB.String(), err
}
func (m *MinikubeRunner) RunCommand(command string, checkError bool) string {
......@@ -133,8 +116,7 @@ func (m *MinikubeRunner) RunCommand(command string, checkError bool) string {
path, _ := filepath.Abs(m.BinaryPath)
cmd := exec.Command(path, commandArr...)
Logf("Run: %s", cmd.Args)
stdout, stderr, err := teeRun(cmd)
Logf("Completed in %s, err=%v, out=%q", time.Since(start), err, bOut.String())
stdout, stderr, err := m.teeRun(cmd)
if checkError && err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
m.T.Fatalf("Error running command: %s %s. Output: %s", command, exitError.Stderr, stdout)
......@@ -142,15 +124,16 @@ func (m *MinikubeRunner) RunCommand(command string, checkError bool) string {
m.T.Fatalf("Error running command: %s %v. Output: %s", command, err, stderr)
}
}
return bOut.String()
return stdout
}
// RunWithContext calls the minikube command with a context, useful for timeouts.
func (m *MinikubeRunner) RunWithContext(ctx context.Context, command string) (stdout, stderr, error) {
func (m *MinikubeRunner) RunWithContext(ctx context.Context, command string) (string, string, error) {
commandArr := strings.Split(command, " ")
path, _ := filepath.Abs(m.BinaryPath)
cmd := exec.CommandContext(ctx, path, commandArr...)
stdout, stderr, err := teeRun(cmd)
Logf("Run: %s", cmd.Args)
return m.teeRun(cmd)
}
func (m *MinikubeRunner) RunDaemon(command string) (*exec.Cmd, *bufio.Reader) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册