提交 c58918c8 编写于 作者: O obscuren

downloader: moved chunk ignoring. Fixes issue with catching up

上级 43409965
...@@ -18,14 +18,15 @@ import ( ...@@ -18,14 +18,15 @@ import (
) )
const ( const (
maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
minDesiredPeerCount = 5 // Amount of peers desired to start syncing peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
) )
var ( var (
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
errLowTd = errors.New("peer's TD is too low") errLowTd = errors.New("peer's TD is too low")
errBusy = errors.New("busy") errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer's unknown or unhealthy") errUnknownPeer = errors.New("peer's unknown or unhealthy")
...@@ -127,11 +128,11 @@ out: ...@@ -127,11 +128,11 @@ out:
for { for {
select { select {
case <-d.newPeerCh: case <-d.newPeerCh:
itimer.Stop()
// Meet the `minDesiredPeerCount` before we select our best peer // Meet the `minDesiredPeerCount` before we select our best peer
if len(d.peers) < minDesiredPeerCount { if len(d.peers) < minDesiredPeerCount {
break break
} }
itimer.Stop()
d.selectPeer(d.peers.bestPeer()) d.selectPeer(d.peers.bestPeer())
case <-itimer.C: case <-itimer.C:
...@@ -154,17 +155,18 @@ func (d *Downloader) selectPeer(p *peer) { ...@@ -154,17 +155,18 @@ func (d *Downloader) selectPeer(p *peer) {
// Make sure it's doing neither. Once done we can restart the // Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on // downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching. // with whatever is going on. This prevents unecessary switching.
if !d.isBusy() { if d.isBusy() {
// selected peer must be better than our own return
// XXX we also check the peer's recent hash to make sure we
// don't have it. Some peers report (i think) incorrect TD.
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
return
}
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
d.syncCh <- syncPack{p, p.recentHash, false}
} }
// selected peer must be better than our own
// XXX we also check the peer's recent hash to make sure we
// don't have it. Some peers report (i think) incorrect TD.
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
return
}
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
d.syncCh <- syncPack{p, p.recentHash, false}
} }
...@@ -282,6 +284,8 @@ out: ...@@ -282,6 +284,8 @@ out:
// If there are unrequested hashes left start fetching // If there are unrequested hashes left start fetching
// from the available peers. // from the available peers.
if d.queue.hashPool.Size() > 0 { if d.queue.hashPool.Size() > 0 {
was := d.queue.hashPool.Size()
fmt.Println("it was =", was)
availablePeers := d.peers.get(idleState) availablePeers := d.peers.get(idleState)
for _, peer := range availablePeers { for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk // Get a possible chunk. If nil is returned no chunk
...@@ -301,13 +305,14 @@ out: ...@@ -301,13 +305,14 @@ out:
d.queue.put(chunk.hashes) d.queue.put(chunk.hashes)
} }
} }
fmt.Println("it is =", d.queue.hashPool.Size())
// make sure that we have peers available for fetching. If all peers have been tried // make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error // and all failed throw an error
if len(d.queue.fetching) == 0 { if len(d.queue.fetching) == 0 {
d.queue.reset() d.queue.reset()
return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers)) return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.hashPool.Size())
} }
} else if len(d.queue.fetching) == 0 { } else if len(d.queue.fetching) == 0 {
......
...@@ -73,7 +73,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) error { ...@@ -73,7 +73,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) error {
} }
func (dl *downloadTester) getHashes(hash common.Hash) error { func (dl *downloadTester) getHashes(hash common.Hash) error {
dl.downloader.HashCh <- dl.hashes dl.downloader.hashCh <- dl.hashes
return nil return nil
} }
...@@ -109,6 +109,8 @@ func TestDownload(t *testing.T) { ...@@ -109,6 +109,8 @@ func TestDownload(t *testing.T) {
glog.SetV(logger.Detail) glog.SetV(logger.Detail)
glog.SetToStderr(true) glog.SetToStderr(true)
minDesiredPeerCount = 4
hashes := createHashes(0, 1000) hashes := createHashes(0, 1000)
blocks := createBlocksFromHashes(hashes) blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks) tester := newTester(t, hashes, blocks)
...@@ -123,7 +125,7 @@ success: ...@@ -123,7 +125,7 @@ success:
case <-tester.done: case <-tester.done:
break success break success
case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer
t.Error("timout") t.Error("timeout")
} }
} }
......
...@@ -71,7 +71,7 @@ type peer struct { ...@@ -71,7 +71,7 @@ type peer struct {
td *big.Int td *big.Int
recentHash common.Hash recentHash common.Hash
requested *set.Set ignored *set.Set
getHashes hashFetcherFn getHashes hashFetcherFn
getBlocks blockFetcherFn getBlocks blockFetcherFn
...@@ -86,7 +86,7 @@ func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, ...@@ -86,7 +86,7 @@ func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn,
getHashes: getHashes, getHashes: getHashes,
getBlocks: getBlocks, getBlocks: getBlocks,
state: idleState, state: idleState,
requested: set.New(), ignored: set.New(),
} }
} }
...@@ -99,8 +99,6 @@ func (p *peer) fetch(chunk *chunk) error { ...@@ -99,8 +99,6 @@ func (p *peer) fetch(chunk *chunk) error {
return errors.New("peer already fetching chunk") return errors.New("peer already fetching chunk")
} }
p.requested.Merge(chunk.hashes)
// set working state // set working state
p.state = workingState p.state = workingState
// convert the set to a fetchable slice // convert the set to a fetchable slice
...@@ -137,5 +135,5 @@ func (p *peer) demote() { ...@@ -137,5 +135,5 @@ func (p *peer) demote() {
func (p *peer) reset() { func (p *peer) reset() {
p.state = idleState p.state = idleState
p.requested.Clear() p.ignored.Clear()
} }
...@@ -56,16 +56,18 @@ func (c *queue) get(p *peer, max int) *chunk { ...@@ -56,16 +56,18 @@ func (c *queue) get(p *peer, max int) *chunk {
// Create a new set of hashes // Create a new set of hashes
hashes, i := set.New(), 0 hashes, i := set.New(), 0
c.hashPool.Each(func(v interface{}) bool { c.hashPool.Each(func(v interface{}) bool {
// break on limit
if i == limit { if i == limit {
return false return false
} }
// skip any hashes that have previously been requested from the peer
// Skip any hashes that have previously been requested from the peer if p.ignored.Has(v) {
if !p.requested.Has(v) { return true
hashes.Add(v)
i++
} }
hashes.Add(v)
i++
return true return true
}) })
// if no hashes can be requested return a nil chunk // if no hashes can be requested return a nil chunk
...@@ -79,7 +81,7 @@ func (c *queue) get(p *peer, max int) *chunk { ...@@ -79,7 +81,7 @@ func (c *queue) get(p *peer, max int) *chunk {
// Create a new chunk for the seperated hashes. The time is being used // Create a new chunk for the seperated hashes. The time is being used
// to reset the chunk (timeout) // to reset the chunk (timeout)
chunk := &chunk{hashes, time.Now()} chunk := &chunk{p, hashes, time.Now()}
// register as 'fetching' state // register as 'fetching' state
c.fetching[p.id] = chunk c.fetching[p.id] = chunk
...@@ -111,6 +113,12 @@ func (c *queue) deliver(id string, blocks []*types.Block) { ...@@ -111,6 +113,12 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
// If the chunk was never requested simply ignore it // If the chunk was never requested simply ignore it
if chunk != nil { if chunk != nil {
delete(c.fetching, id) delete(c.fetching, id)
// check the length of the returned blocks. If the length of blocks is 0
// we'll assume the peer doesn't know about the chain.
if len(blocks) == 0 {
// So we can ignore the blocks we didn't know about
chunk.peer.ignored.Merge(chunk.hashes)
}
// seperate the blocks and the hashes // seperate the blocks and the hashes
blockHashes := chunk.fetchedHashes(blocks) blockHashes := chunk.fetchedHashes(blocks)
...@@ -118,7 +126,6 @@ func (c *queue) deliver(id string, blocks []*types.Block) { ...@@ -118,7 +126,6 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
c.blockHashes.Merge(blockHashes) c.blockHashes.Merge(blockHashes)
// Add the blocks // Add the blocks
c.blocks = append(c.blocks, blocks...) c.blocks = append(c.blocks, blocks...)
// Add back whatever couldn't be delivered // Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes) c.hashPool.Merge(chunk.hashes)
c.fetchPool.Separate(chunk.hashes) c.fetchPool.Separate(chunk.hashes)
...@@ -134,6 +141,7 @@ func (c *queue) put(hashes *set.Set) { ...@@ -134,6 +141,7 @@ func (c *queue) put(hashes *set.Set) {
} }
type chunk struct { type chunk struct {
peer *peer
hashes *set.Set hashes *set.Set
itime time.Time itime time.Time
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册