提交 90b94e64 编写于 作者: J Jeffrey Wilcke

Merge pull request #971 from fjl/p2p-limit-tweaks

p2p: tweak connection limits
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
...@@ -72,8 +73,8 @@ func newMemoryNodeDB() (*nodeDB, error) { ...@@ -72,8 +73,8 @@ func newMemoryNodeDB() (*nodeDB, error) {
// newPersistentNodeDB creates/opens a leveldb backed persistent node database, // newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch. // also flushing its contents in case of a version mismatch.
func newPersistentNodeDB(path string, version int) (*nodeDB, error) { func newPersistentNodeDB(path string, version int) (*nodeDB, error) {
// Try to open the cache, recovering any corruption opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, nil) db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil) db, err = leveldb.RecoverFile(path, nil)
} }
......
...@@ -25,7 +25,7 @@ const ( ...@@ -25,7 +25,7 @@ const (
hashBits = len(common.Hash{}) * 8 hashBits = len(common.Hash{}) * 8
nBuckets = hashBits + 1 // Number of buckets nBuckets = hashBits + 1 // Number of buckets
maxBondingPingPongs = 10 maxBondingPingPongs = 16
) )
type Table struct { type Table struct {
......
...@@ -131,10 +131,11 @@ func (p *Peer) run() DiscReason { ...@@ -131,10 +131,11 @@ func (p *Peer) run() DiscReason {
case err := <-p.protoErr: case err := <-p.protoErr:
reason = discReasonForError(err) reason = discReasonForError(err)
case reason = <-p.disc: case reason = <-p.disc:
p.politeDisconnect(reason)
reason = DiscRequested
} }
close(p.closed) close(p.closed)
p.politeDisconnect(reason)
p.wg.Wait() p.wg.Wait()
glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason)
return reason return reason
...@@ -191,7 +192,7 @@ func (p *Peer) handle(msg Msg) error { ...@@ -191,7 +192,7 @@ func (p *Peer) handle(msg Msg) error {
// check errors because, the connection will be closed after it. // check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason) rlp.Decode(msg.Payload, &reason)
glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0]) glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0])
return DiscRequested return reason[0]
case msg.Code < baseProtocolLength: case msg.Code < baseProtocolLength:
// ignore other base protocol messages // ignore other base protocol messages
return msg.Discard() return msg.Discard()
......
...@@ -50,8 +50,6 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { ...@@ -50,8 +50,6 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) {
} }
func TestPeerProtoReadMsg(t *testing.T) { func TestPeerProtoReadMsg(t *testing.T) {
defer testlog(t).detach()
done := make(chan struct{}) done := make(chan struct{})
proto := Protocol{ proto := Protocol{
Name: "a", Name: "a",
...@@ -88,8 +86,6 @@ func TestPeerProtoReadMsg(t *testing.T) { ...@@ -88,8 +86,6 @@ func TestPeerProtoReadMsg(t *testing.T) {
} }
func TestPeerProtoEncodeMsg(t *testing.T) { func TestPeerProtoEncodeMsg(t *testing.T) {
defer testlog(t).detach()
proto := Protocol{ proto := Protocol{
Name: "a", Name: "a",
Length: 2, Length: 2,
...@@ -112,8 +108,6 @@ func TestPeerProtoEncodeMsg(t *testing.T) { ...@@ -112,8 +108,6 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
} }
func TestPeerWriteForBroadcast(t *testing.T) { func TestPeerWriteForBroadcast(t *testing.T) {
defer testlog(t).detach()
closer, rw, peer, peerErr := testPeer([]Protocol{discard}) closer, rw, peer, peerErr := testPeer([]Protocol{discard})
defer closer() defer closer()
...@@ -152,8 +146,6 @@ func TestPeerWriteForBroadcast(t *testing.T) { ...@@ -152,8 +146,6 @@ func TestPeerWriteForBroadcast(t *testing.T) {
} }
func TestPeerPing(t *testing.T) { func TestPeerPing(t *testing.T) {
defer testlog(t).detach()
closer, rw, _, _ := testPeer(nil) closer, rw, _, _ := testPeer(nil)
defer closer() defer closer()
if err := SendItems(rw, pingMsg); err != nil { if err := SendItems(rw, pingMsg); err != nil {
...@@ -165,26 +157,24 @@ func TestPeerPing(t *testing.T) { ...@@ -165,26 +157,24 @@ func TestPeerPing(t *testing.T) {
} }
func TestPeerDisconnect(t *testing.T) { func TestPeerDisconnect(t *testing.T) {
defer testlog(t).detach()
closer, rw, _, disc := testPeer(nil) closer, rw, _, disc := testPeer(nil)
defer closer() defer closer()
if err := SendItems(rw, discMsg, DiscQuitting); err != nil { if err := SendItems(rw, discMsg, DiscQuitting); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := ExpectMsg(rw, discMsg, []interface{}{DiscRequested}); err != nil { select {
t.Error(err) case reason := <-disc:
} if reason != DiscQuitting {
closer() t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested)
if reason := <-disc; reason != DiscRequested { }
t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) case <-time.After(500 * time.Millisecond):
t.Error("peer did not return")
} }
} }
// This test is supposed to verify that Peer can reliably handle // This test is supposed to verify that Peer can reliably handle
// multiple causes of disconnection occurring at the same time. // multiple causes of disconnection occurring at the same time.
func TestPeerDisconnectRace(t *testing.T) { func TestPeerDisconnectRace(t *testing.T) {
defer testlog(t).detach()
maybe := func() bool { return rand.Intn(1) == 1 } maybe := func() bool { return rand.Intn(1) == 1 }
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
......
...@@ -18,12 +18,12 @@ import ( ...@@ -18,12 +18,12 @@ import (
) )
const ( const (
defaultDialTimeout = 10 * time.Second defaultDialTimeout = 15 * time.Second
refreshPeersInterval = 30 * time.Second refreshPeersInterval = 30 * time.Second
staticPeerCheckInterval = 15 * time.Second staticPeerCheckInterval = 15 * time.Second
// Maximum number of concurrently handshaking inbound connections. // Maximum number of concurrently handshaking inbound connections.
maxAcceptConns = 10 maxAcceptConns = 50
// Maximum number of concurrently dialing outbound connections. // Maximum number of concurrently dialing outbound connections.
maxDialingConns = 10 maxDialingConns = 10
......
...@@ -46,8 +46,6 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server { ...@@ -46,8 +46,6 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
} }
func TestServerListen(t *testing.T) { func TestServerListen(t *testing.T) {
defer testlog(t).detach()
// start the test server // start the test server
connected := make(chan *Peer) connected := make(chan *Peer)
srv := startTestServer(t, func(p *Peer) { srv := startTestServer(t, func(p *Peer) {
...@@ -78,8 +76,6 @@ func TestServerListen(t *testing.T) { ...@@ -78,8 +76,6 @@ func TestServerListen(t *testing.T) {
} }
func TestServerDial(t *testing.T) { func TestServerDial(t *testing.T) {
defer testlog(t).detach()
// run a one-shot TCP server to handle the connection. // run a one-shot TCP server to handle the connection.
listener, err := net.Listen("tcp", "127.0.0.1:0") listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
...@@ -126,8 +122,6 @@ func TestServerDial(t *testing.T) { ...@@ -126,8 +122,6 @@ func TestServerDial(t *testing.T) {
} }
func TestServerBroadcast(t *testing.T) { func TestServerBroadcast(t *testing.T) {
defer testlog(t).detach()
var connected sync.WaitGroup var connected sync.WaitGroup
srv := startTestServer(t, func(p *Peer) { srv := startTestServer(t, func(p *Peer) {
p.running = matchProtocols([]Protocol{discard}, []Cap{discard.cap()}, p.rw) p.running = matchProtocols([]Protocol{discard}, []Cap{discard.cap()}, p.rw)
...@@ -172,8 +166,6 @@ func TestServerBroadcast(t *testing.T) { ...@@ -172,8 +166,6 @@ func TestServerBroadcast(t *testing.T) {
// //
// It also serves as a light-weight integration test. // It also serves as a light-weight integration test.
func TestServerDisconnectAtCap(t *testing.T) { func TestServerDisconnectAtCap(t *testing.T) {
defer testlog(t).detach()
started := make(chan *Peer) started := make(chan *Peer)
srv := &Server{ srv := &Server{
ListenAddr: "127.0.0.1:0", ListenAddr: "127.0.0.1:0",
...@@ -224,8 +216,6 @@ func TestServerDisconnectAtCap(t *testing.T) { ...@@ -224,8 +216,6 @@ func TestServerDisconnectAtCap(t *testing.T) {
// Tests that static peers are (re)connected, and done so even above max peers. // Tests that static peers are (re)connected, and done so even above max peers.
func TestServerStaticPeers(t *testing.T) { func TestServerStaticPeers(t *testing.T) {
defer testlog(t).detach()
// Create a test server with limited connection slots // Create a test server with limited connection slots
started := make(chan *Peer) started := make(chan *Peer)
server := &Server{ server := &Server{
...@@ -312,7 +302,6 @@ func TestServerStaticPeers(t *testing.T) { ...@@ -312,7 +302,6 @@ func TestServerStaticPeers(t *testing.T) {
// Tests that trusted peers and can connect above max peer caps. // Tests that trusted peers and can connect above max peer caps.
func TestServerTrustedPeers(t *testing.T) { func TestServerTrustedPeers(t *testing.T) {
defer testlog(t).detach()
// Create a trusted peer to accept connections from // Create a trusted peer to accept connections from
key := newkey() key := newkey()
...@@ -374,8 +363,6 @@ func TestServerTrustedPeers(t *testing.T) { ...@@ -374,8 +363,6 @@ func TestServerTrustedPeers(t *testing.T) {
// Tests that a failed dial will temporarily throttle a peer. // Tests that a failed dial will temporarily throttle a peer.
func TestServerMaxPendingDials(t *testing.T) { func TestServerMaxPendingDials(t *testing.T) {
defer testlog(t).detach()
// Start a simple test server // Start a simple test server
server := &Server{ server := &Server{
ListenAddr: "127.0.0.1:0", ListenAddr: "127.0.0.1:0",
...@@ -443,8 +430,6 @@ func TestServerMaxPendingDials(t *testing.T) { ...@@ -443,8 +430,6 @@ func TestServerMaxPendingDials(t *testing.T) {
} }
func TestServerMaxPendingAccepts(t *testing.T) { func TestServerMaxPendingAccepts(t *testing.T) {
defer testlog(t).detach()
// Start a test server and a peer sink for synchronization // Start a test server and a peer sink for synchronization
started := make(chan *Peer) started := make(chan *Peer)
server := &Server{ server := &Server{
......
package p2p
import (
"testing"
"github.com/ethereum/go-ethereum/logger"
)
type testLogger struct{ t *testing.T }
func testlog(t *testing.T) testLogger {
logger.Reset()
l := testLogger{t}
logger.AddLogSystem(l)
return l
}
func (l testLogger) LogPrint(msg logger.LogMsg) {
l.t.Logf("%s", msg.String())
}
func (testLogger) detach() {
logger.Flush()
logger.Reset()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册