提交 45eaef24 编写于 作者: J Janoš Guljaš 提交者: Balint Gabor

cmd/swarm: solve rare cases of using the same random port in tests (#17352)

上级 3bcb501c
......@@ -20,6 +20,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"testing"
......@@ -559,3 +560,16 @@ func TestValidateConfig(t *testing.T) {
}
}
}
func assignTCPPort() (string, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return "", err
}
l.Close()
_, port, err := net.SplitHostPort(l.Addr().String())
if err != nil {
return "", err
}
return port, nil
}
......@@ -17,12 +17,15 @@
package main
import (
"context"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"runtime"
"sync"
"syscall"
"testing"
"time"
......@@ -218,14 +221,12 @@ func existingTestNode(t *testing.T, dir string, bzzaccount string) *testNode {
}
// assign ports
httpPort, err := assignTCPPort()
if err != nil {
t.Fatal(err)
}
p2pPort, err := assignTCPPort()
ports, err := getAvailableTCPPorts(2)
if err != nil {
t.Fatal(err)
}
p2pPort := ports[0]
httpPort := ports[1]
// start the node
node.Cmd = runSwarm(t,
......@@ -246,6 +247,17 @@ func existingTestNode(t *testing.T, dir string, bzzaccount string) *testNode {
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// ensure that all ports have active listeners
// so that the next node will not get the same
// when calling getAvailableTCPPorts
err = waitTCPPorts(ctx, ports...)
if err != nil {
t.Fatal(err)
}
// wait for the node to start
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
node.Client, err = rpc.Dial(conf.IPCEndpoint())
......@@ -280,14 +292,12 @@ func newTestNode(t *testing.T, dir string) *testNode {
node := &testNode{Dir: dir}
// assign ports
httpPort, err := assignTCPPort()
if err != nil {
t.Fatal(err)
}
p2pPort, err := assignTCPPort()
ports, err := getAvailableTCPPorts(2)
if err != nil {
t.Fatal(err)
}
p2pPort := ports[0]
httpPort := ports[1]
// start the node
node.Cmd = runSwarm(t,
......@@ -308,6 +318,17 @@ func newTestNode(t *testing.T, dir string) *testNode {
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// ensure that all ports have active listeners
// so that the next node will not get the same
// when calling getAvailableTCPPorts
err = waitTCPPorts(ctx, ports...)
if err != nil {
t.Fatal(err)
}
// wait for the node to start
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
node.Client, err = rpc.Dial(conf.IPCEndpoint())
......@@ -343,15 +364,92 @@ func (n *testNode) Shutdown() {
}
}
func assignTCPPort() (string, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return "", err
// getAvailableTCPPorts returns a set of ports that
// nothing is listening on at the time.
//
// Function assignTCPPort cannot be called in sequence
// and guardantee that the same port will be returned in
// different calls as the listener is closed within the function,
// not after all listeners are started and selected unique
// available ports.
func getAvailableTCPPorts(count int) (ports []string, err error) {
for i := 0; i < count; i++ {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
}
// defer close in the loop to be sure the same port will not
// be selected in the next iteration
defer l.Close()
_, port, err := net.SplitHostPort(l.Addr().String())
if err != nil {
return nil, err
}
ports = append(ports, port)
}
l.Close()
_, port, err := net.SplitHostPort(l.Addr().String())
if err != nil {
return "", err
return ports, nil
}
// waitTCPPorts blocks until tcp connections can be
// established on all provided ports. It runs all
// ports dialers in parallel, and returns the first
// encountered error.
// See waitTCPPort also.
func waitTCPPorts(ctx context.Context, ports ...string) error {
var err error
// mu locks err variable that is assigned in
// other goroutines
var mu sync.Mutex
// cancel is canceling all goroutines
// when the firs error is returned
// to prevent unnecessary waiting
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
for _, port := range ports {
wg.Add(1)
go func(port string) {
defer wg.Done()
e := waitTCPPort(ctx, port)
mu.Lock()
defer mu.Unlock()
if e != nil && err == nil {
err = e
cancel()
}
}(port)
}
wg.Wait()
return err
}
// waitTCPPort blocks until tcp connection can be established
// ona provided port. It has a 3 minute timeout as maximum,
// to prevent long waiting, but it can be shortened with
// a provided context instance. Dialer has a 10 second timeout
// in every iteration, and connection refused error will be
// retried in 100 milliseconds periods.
func waitTCPPort(ctx context.Context, port string) error {
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
for {
c, err := (&net.Dialer{Timeout: 10 * time.Second}).DialContext(ctx, "tcp", "127.0.0.1:"+port)
if err != nil {
if operr, ok := err.(*net.OpError); ok {
if syserr, ok := operr.Err.(*os.SyscallError); ok && syserr.Err == syscall.ECONNREFUSED {
time.Sleep(100 * time.Millisecond)
continue
}
}
return err
}
return c.Close()
}
return port, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册