提交 9c45b446 编写于 作者: F Felix Lange 提交者: GitHub

Merge pull request #3607 from zsfelfoldi/light-fix2

les: fix private net issues, enable adding peers manually again
...@@ -654,6 +654,10 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node { ...@@ -654,6 +654,10 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node {
vsn += "-" + gitCommit[:8] vsn += "-" + gitCommit[:8]
} }
// if we're running a light client or server, force enable the v5 peer discovery unless it is explicitly disabled with --nodiscover
// note that explicitly specifying --v5disc overrides --nodiscover, in which case the later only disables v4 discovery
forceV5Discovery := (ctx.GlobalBool(LightModeFlag.Name) || ctx.GlobalInt(LightServFlag.Name) > 0) && !ctx.GlobalBool(NoDiscoverFlag.Name)
config := &node.Config{ config := &node.Config{
DataDir: MakeDataDir(ctx), DataDir: MakeDataDir(ctx),
KeyStoreDir: ctx.GlobalString(KeyStoreDirFlag.Name), KeyStoreDir: ctx.GlobalString(KeyStoreDirFlag.Name),
...@@ -662,8 +666,8 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node { ...@@ -662,8 +666,8 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node {
Name: name, Name: name,
Version: vsn, Version: vsn,
UserIdent: makeNodeUserIdent(ctx), UserIdent: makeNodeUserIdent(ctx),
NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name), NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name), // always disable v4 discovery in light client mode
DiscoveryV5: ctx.GlobalBool(DiscoveryV5Flag.Name) || ctx.GlobalBool(LightModeFlag.Name) || ctx.GlobalInt(LightServFlag.Name) > 0, DiscoveryV5: ctx.GlobalBool(DiscoveryV5Flag.Name) || forceV5Discovery,
DiscoveryV5Addr: MakeDiscoveryV5Address(ctx), DiscoveryV5Addr: MakeDiscoveryV5Address(ctx),
BootstrapNodes: MakeBootstrapNodes(ctx), BootstrapNodes: MakeBootstrapNodes(ctx),
BootstrapNodesV5: MakeBootstrapNodesV5(ctx), BootstrapNodesV5: MakeBootstrapNodesV5(ctx),
......
...@@ -105,7 +105,6 @@ type Config struct { ...@@ -105,7 +105,6 @@ type Config struct {
type LesServer interface { type LesServer interface {
Start(srvr *p2p.Server) Start(srvr *p2p.Server)
Synced()
Stop() Stop()
Protocols() []p2p.Protocol Protocols() []p2p.Protocol
} }
......
...@@ -173,7 +173,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int ...@@ -173,7 +173,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int
return blockchain.CurrentBlock().NumberU64() return blockchain.CurrentBlock().NumberU64()
} }
inserter := func(blocks types.Blocks) (int, error) { inserter := func(blocks types.Blocks) (int, error) {
manager.setSynced() // Mark initial sync done on any fetcher import atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import
return manager.insertChain(blocks) return manager.insertChain(blocks)
} }
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
......
...@@ -181,7 +181,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { ...@@ -181,7 +181,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return return
} }
pm.setSynced() // Mark initial sync done atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
// If fast sync was enabled, and we synced up, disable it // If fast sync was enabled, and we synced up, disable it
if atomic.LoadUint32(&pm.fastSync) == 1 { if atomic.LoadUint32(&pm.fastSync) == 1 {
...@@ -192,10 +192,3 @@ func (pm *ProtocolManager) synchronise(peer *peer) { ...@@ -192,10 +192,3 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
} }
} }
} }
// setSynced sets the synced flag and notifies the light server if present
func (pm *ProtocolManager) setSynced() {
if atomic.SwapUint32(&pm.synced, 1) == 0 && pm.lesServer != nil {
pm.lesServer.Synced()
}
}
...@@ -160,9 +160,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network ...@@ -160,9 +160,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
if manager.serverPool != nil { if manager.serverPool != nil {
addr := p.RemoteAddr().(*net.TCPAddr) addr := p.RemoteAddr().(*net.TCPAddr)
entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port)) entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
if entry == nil {
return fmt.Errorf("unwanted connection")
}
} }
peer.poolEntry = entry peer.poolEntry = entry
select { select {
......
...@@ -42,9 +42,7 @@ type LesServer struct { ...@@ -42,9 +42,7 @@ type LesServer struct {
fcManager *flowcontrol.ClientManager // nil if our node is client only fcManager *flowcontrol.ClientManager // nil if our node is client only
fcCostStats *requestCostStats fcCostStats *requestCostStats
defParams *flowcontrol.ServerParams defParams *flowcontrol.ServerParams
srvr *p2p.Server stopped bool
synced, stopped bool
lock sync.Mutex
} }
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
...@@ -70,35 +68,13 @@ func (s *LesServer) Protocols() []p2p.Protocol { ...@@ -70,35 +68,13 @@ func (s *LesServer) Protocols() []p2p.Protocol {
return s.protocolManager.SubProtocols return s.protocolManager.SubProtocols
} }
// Start only starts the actual service if the ETH protocol has already been synced, // Start starts the LES server
// otherwise it will be started by Synced()
func (s *LesServer) Start(srvr *p2p.Server) { func (s *LesServer) Start(srvr *p2p.Server) {
s.lock.Lock() s.protocolManager.Start(srvr)
defer s.lock.Unlock()
s.srvr = srvr
if s.synced {
s.protocolManager.Start(s.srvr)
}
}
// Synced notifies the server that the ETH protocol has been synced and LES service can be started
func (s *LesServer) Synced() {
s.lock.Lock()
defer s.lock.Unlock()
s.synced = true
if s.srvr != nil && !s.stopped {
s.protocolManager.Start(s.srvr)
}
} }
// Stop stops the LES service // Stop stops the LES service
func (s *LesServer) Stop() { func (s *LesServer) Stop() {
s.lock.Lock()
defer s.lock.Unlock()
s.stopped = true
s.fcCostStats.store() s.fcCostStats.store()
s.fcManager.Stop() s.fcManager.Stop()
go func() { go func() {
......
...@@ -160,10 +160,10 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { ...@@ -160,10 +160,10 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
defer pool.lock.Unlock() defer pool.lock.Unlock()
entry := pool.entries[p.ID()] entry := pool.entries[p.ID()]
if entry == nil { if entry == nil {
return nil entry = pool.findOrNewNode(p.ID(), ip, port)
} }
glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state) glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
if entry.state != psDialed { if entry.state == psConnected || entry.state == psRegistered {
return nil return nil
} }
pool.connWg.Add(1) pool.connWg.Add(1)
...@@ -250,11 +250,17 @@ type poolStatAdjust struct { ...@@ -250,11 +250,17 @@ type poolStatAdjust struct {
// adjustBlockDelay adjusts the block announce delay statistics of a node // adjustBlockDelay adjusts the block announce delay statistics of a node
func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) { func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
if entry == nil {
return
}
pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time} pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
} }
// adjustResponseTime adjusts the request response time statistics of a node // adjustResponseTime adjusts the request response time statistics of a node
func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) { func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
if entry == nil {
return
}
if timeout { if timeout {
pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time} pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
} else { } else {
...@@ -342,7 +348,9 @@ func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, ...@@ -342,7 +348,9 @@ func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool,
func (pool *serverPool) eventLoop() { func (pool *serverPool) eventLoop() {
lookupCnt := 0 lookupCnt := 0
var convTime mclock.AbsTime var convTime mclock.AbsTime
pool.discSetPeriod <- time.Millisecond * 100 if pool.discSetPeriod != nil {
pool.discSetPeriod <- time.Millisecond * 100
}
for { for {
select { select {
case entry := <-pool.timeout: case entry := <-pool.timeout:
...@@ -375,39 +383,7 @@ func (pool *serverPool) eventLoop() { ...@@ -375,39 +383,7 @@ func (pool *serverPool) eventLoop() {
case node := <-pool.discNodes: case node := <-pool.discNodes:
pool.lock.Lock() pool.lock.Lock()
now := mclock.Now() entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
id := discover.NodeID(node.ID)
entry := pool.entries[id]
if entry == nil {
glog.V(logger.Debug).Infof("discovered %v", node.String())
entry = &poolEntry{
id: id,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
pool.entries[id] = entry
// initialize previously unknown peers with good statistics to give a chance to prove themselves
entry.connectStats.add(1, initStatsWeight)
entry.delayStats.add(0, initStatsWeight)
entry.responseStats.add(0, initStatsWeight)
entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
addr := &poolEntryAddress{
ip: node.IP,
port: node.TCP,
}
if a, ok := entry.addr[addr.strKey()]; ok {
addr = a
} else {
entry.addr[addr.strKey()] = addr
}
addr.lastSeen = now
entry.addrSelect.update(addr)
if !entry.known {
pool.newQueue.setLatest(entry)
}
pool.updateCheckDial(entry) pool.updateCheckDial(entry)
pool.lock.Unlock() pool.lock.Unlock()
...@@ -419,12 +395,16 @@ func (pool *serverPool) eventLoop() { ...@@ -419,12 +395,16 @@ func (pool *serverPool) eventLoop() {
lookupCnt++ lookupCnt++
if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) { if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
pool.fastDiscover = false pool.fastDiscover = false
pool.discSetPeriod <- time.Minute if pool.discSetPeriod != nil {
pool.discSetPeriod <- time.Minute
}
} }
} }
case <-pool.quit: case <-pool.quit:
close(pool.discSetPeriod) if pool.discSetPeriod != nil {
close(pool.discSetPeriod)
}
pool.connWg.Wait() pool.connWg.Wait()
pool.saveNodes() pool.saveNodes()
pool.wg.Done() pool.wg.Done()
...@@ -434,6 +414,42 @@ func (pool *serverPool) eventLoop() { ...@@ -434,6 +414,42 @@ func (pool *serverPool) eventLoop() {
} }
} }
func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
now := mclock.Now()
entry := pool.entries[id]
if entry == nil {
glog.V(logger.Debug).Infof("discovered %v", id.String())
entry = &poolEntry{
id: id,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
pool.entries[id] = entry
// initialize previously unknown peers with good statistics to give a chance to prove themselves
entry.connectStats.add(1, initStatsWeight)
entry.delayStats.add(0, initStatsWeight)
entry.responseStats.add(0, initStatsWeight)
entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
addr := &poolEntryAddress{
ip: ip,
port: port,
}
if a, ok := entry.addr[addr.strKey()]; ok {
addr = a
} else {
entry.addr[addr.strKey()] = addr
}
addr.lastSeen = now
entry.addrSelect.update(addr)
if !entry.known {
pool.newQueue.setLatest(entry)
}
return entry
}
// loadNodes loads known nodes and their statistics from the database // loadNodes loads known nodes and their statistics from the database
func (pool *serverPool) loadNodes() { func (pool *serverPool) loadNodes() {
enc, err := pool.db.Get(pool.dbKey) enc, err := pool.db.Get(pool.dbKey)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册