diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c5b951344641944555707800e0edf0bda4ecb4ee..c71cfa684d224885cea6fbb08de3cb7af356a848 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -17,8 +17,9 @@ import ( ) const ( - maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk - minDesiredPeerCount = 3 // Amount of peers desired to start syncing + maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk + minDesiredPeerCount = 3 // Amount of peers desired to start syncing + blockTtl = 15 * time.Second // The amount of time it takes for a request to time out ) var ( @@ -96,7 +97,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH // add peer to our peer set d.peers[id] = peer // broadcast new peer - //d.newPeerCh <- peer + d.newPeerCh <- peer return nil } @@ -265,6 +266,9 @@ out: // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { + atomic.StoreInt32(&d.fetchingHashes, 1) + defer atomic.StoreInt32(&d.fetchingHashes, 0) + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) start := time.Now() @@ -275,10 +279,8 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia // Add the hash to the queue first d.queue.hashPool.Add(hash) } - // Get the first batch of hashes p.getHashes(hash) - atomic.StoreInt32(&d.fetchingHashes, 1) out: for { @@ -299,14 +301,16 @@ out: d.queue.put(hashSet) // Add hashes to the chunk set - // Check if we're done fetching - if !done && len(hashes) > 0 { + if len(hashes) == 0 { // Make sure the peer actually gave you something valid + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id) + d.queue.reset() + + break out + } else if !done { // Check if we're done fetching //fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) // Get the next set of hashes p.getHashes(hashes[len(hashes)-1]) - atomic.StoreInt32(&d.fetchingHashes, 1) - } else { - atomic.StoreInt32(&d.fetchingHashes, 0) + } else { // we're done break out } } @@ -319,6 +323,7 @@ out: func (d *Downloader) startFetchingBlocks(p *peer) error { glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks") atomic.StoreInt32(&d.downloadingBlocks, 1) + defer atomic.StoreInt32(&d.downloadingBlocks, 0) start := time.Now() @@ -364,8 +369,6 @@ out: // When there are no more queue and no more `fetching`. We can // safely assume we're done. Another part of the process will check // for parent errors and will re-request anything that's missing - atomic.StoreInt32(&d.downloadingBlocks, 0) - // Break out so that we can process with processing blocks break out } else { // Check for bad peers. Bad peers may indicate a peer not responding @@ -376,7 +379,7 @@ out: d.queue.mu.Lock() var badPeers []string for pid, chunk := range d.queue.fetching { - if time.Since(chunk.itime) > 5*time.Second { + if time.Since(chunk.itime) > blockTtl { badPeers = append(badPeers, pid) // remove peer as good peer from peer list d.UnregisterPeer(pid) @@ -466,8 +469,11 @@ func (d *Downloader) process() error { // to a seperate goroutine where it periodically checks for linked pieces. types.BlockBy(types.Number).Sort(d.queue.blocks) blocks := d.queue.blocks + if len(blocks) == 0 { + return nil + } - glog.V(logger.Debug).Infoln("Inserting chain with", len(blocks), "blocks") + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) var err error // Loop untill we're out of blocks @@ -491,6 +497,11 @@ func (d *Downloader) process() error { } } break + } else if err != nil { + // Reset chain completely. This needs much, much improvement. + // instead: check all blocks leading down to this block false block and remove it + blocks = nil + break } blocks = blocks[max:] } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 4d1aa4e934f643ef23b02fb901223f38e03b6f29..df3bf7087b72a3c9fb4c76cce4e459b8780655be 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -31,6 +31,17 @@ func newqueue() *queue { } } +func (c *queue) reset() { + c.mu.Lock() + defer c.mu.Unlock() + + c.hashPool.Clear() + c.fetchPool.Clear() + c.blockHashes.Clear() + c.blocks = nil + c.fetching = make(map[string]*chunk) +} + // reserve a `max` set of hashes for `p` peer. func (c *queue) get(p *peer, max int) *chunk { c.mu.Lock() diff --git a/eth/handler.go b/eth/handler.go index bb12e19043964d5513b2a33d0e921850d98485a7..f3fad68b7744efbdd3e554cc0a7b94a31c1a18b3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -95,7 +95,7 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) err := manager.handle(peer) - glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err) + //glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err) return err },