提交 47a7fe5d 编写于 作者: P Péter Szilágyi

eth: port the synchronisation algo to eth/62

上级 ca88e18f
......@@ -140,7 +140,7 @@ var (
}
EthVersionFlag = cli.IntFlag{
Name: "eth",
Value: 61,
Value: 62,
Usage: "Highest eth protocol to advertise (temporary, dev option)",
}
......
......@@ -357,6 +357,20 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block {
}
}
// WithBody returns a new block with the given transaction and uncle contents.
func (b *Block) WithBody(transactions []*Transaction, uncles []*Header) *Block {
block := &Block{
header: copyHeader(b.header),
transactions: make([]*Transaction, len(transactions)),
uncles: make([]*Header, len(uncles)),
}
copy(block.transactions, transactions)
for i := range uncles {
block.uncles[i] = copyHeader(uncles[i])
}
return block
}
// Implement pow.Block
func (b *Block) Hash() common.Hash {
......
此差异已折叠。
此差异已折叠。
......@@ -31,10 +31,16 @@ import (
"gopkg.in/fatih/set.v0"
)
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error
// Block header and body fethers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered")
......@@ -54,25 +60,37 @@ type peer struct {
ignored *set.Set // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position
getBlocks blockFetcherFn // Method to retrieve a batch of blocks
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
version int // Eth protocol version number to switch strategies
}
// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer {
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer {
return &peer{
id: id,
head: head,
capacity: 1,
id: id,
head: head,
capacity: 1,
ignored: set.New(),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
getBlocks: getBlocks,
ignored: set.New(),
version: version,
getRelHeaders: getRelHeaders,
getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies,
version: version,
}
}
......@@ -83,8 +101,8 @@ func (p *peer) Reset() {
p.ignored.Clear()
}
// Fetch sends a block retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error {
// Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
......@@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil
}
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Fetch sends a block body retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
}
p.started = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
go p.getBlockBodies(hashes)
return nil
}
// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
func (p *peer) SetIdle61() {
// Update the peer's download allowance based on previous performance
scale := 2.0
if time.Since(p.started) > blockSoftTTL {
......@@ -131,6 +167,36 @@ func (p *peer) SetIdle() {
atomic.StoreInt32(&p.idle, 0)
}
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
// Update the peer's download allowance based on previous performance
scale := 2.0
if time.Since(p.started) > bodySoftTTL {
scale = 0.5
if time.Since(p.started) > bodyHardTTL {
scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(&p.capacity)
next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale)))
// Try to update the old value
if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers
if next == 1 {
p.Demote()
}
break
}
}
// Set the peer to idle to allow further block requests
atomic.StoreInt32(&p.idle, 0)
}
// Capacity retrieves the peers block download allowance based on its previously
// discovered bandwidth capacity.
func (p *peer) Capacity() int {
......
......@@ -43,16 +43,20 @@ var (
// fetchRequest is a currently running block retrieval operation.
type fetchRequest struct {
Peer *peer // Peer to which the request was sent
Hashes map[common.Hash]int // Requested hashes with their insertion index (priority)
Time time.Time // Time when the request was made
Peer *peer // Peer to which the request was sent
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
}
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
hashPool map[common.Hash]int // Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // Priority queue of the block hashes to fetch
hashCounter int // Counter indexing the added hashes to ensure retrieval order
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
......@@ -66,11 +70,13 @@ type queue struct {
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue() *queue {
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
pendPool: make(map[string]*fetchRequest),
blockPool: make(map[common.Hash]uint64),
blockCache: make([]*Block, blockCacheLimit),
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
headerPool: make(map[common.Hash]*types.Header),
headerQueue: prque.New(),
pendPool: make(map[string]*fetchRequest),
blockPool: make(map[common.Hash]uint64),
blockCache: make([]*Block, blockCacheLimit),
}
}
......@@ -83,6 +89,9 @@ func (q *queue) Reset() {
q.hashQueue.Reset()
q.hashCounter = 0
q.headerPool = make(map[common.Hash]*types.Header)
q.headerQueue.Reset()
q.pendPool = make(map[string]*fetchRequest)
q.blockPool = make(map[common.Hash]uint64)
......@@ -90,21 +99,21 @@ func (q *queue) Reset() {
q.blockCache = make([]*Block, blockCacheLimit)
}
// Size retrieves the number of hashes in the queue, returning separately for
// Size retrieves the number of blocks in the queue, returning separately for
// pending and already downloaded.
func (q *queue) Size() (int, int) {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.hashPool), len(q.blockPool)
return len(q.hashPool) + len(q.headerPool), len(q.blockPool)
}
// Pending retrieves the number of hashes pending for retrieval.
// Pending retrieves the number of blocks pending for retrieval.
func (q *queue) Pending() int {
q.lock.RLock()
defer q.lock.RUnlock()
return q.hashQueue.Size()
return q.hashQueue.Size() + q.headerQueue.Size()
}
// InFlight retrieves the number of fetch requests currently in flight.
......@@ -124,7 +133,7 @@ func (q *queue) Throttle() bool {
// Calculate the currently in-flight block requests
pending := 0
for _, request := range q.pendPool {
pending += len(request.Hashes)
pending += len(request.Hashes) + len(request.Headers)
}
// Throttle if more blocks are in-flight than free space in the cache
return pending >= len(q.blockCache)-len(q.blockPool)
......@@ -138,15 +147,18 @@ func (q *queue) Has(hash common.Hash) bool {
if _, ok := q.hashPool[hash]; ok {
return true
}
if _, ok := q.headerPool[hash]; ok {
return true
}
if _, ok := q.blockPool[hash]; ok {
return true
}
return false
}
// Insert adds a set of hashes for the download queue for scheduling, returning
// Insert61 adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash {
func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
......@@ -172,6 +184,29 @@ func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash {
return inserts
}
// Insert adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
func (q *queue) Insert(headers []*types.Header) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the headers prioritized by the contained block number
inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers {
// Make sure no duplicate requests are executed
hash := header.Hash()
if _, ok := q.headerPool[hash]; ok {
glog.V(logger.Warn).Infof("Header %x already scheduled", hash)
continue
}
// Queue the header for body retrieval
inserts = append(inserts, header)
q.headerPool[hash] = header
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
}
return inserts
}
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
// been downloaded yet (or simply non existent).
func (q *queue) GetHeadBlock() *Block {
......@@ -227,9 +262,9 @@ func (q *queue) TakeBlocks() []*Block {
return blocks
}
// Reserve reserves a set of hashes for the given peer, skipping any previously
// Reserve61 reserves a set of hashes for the given peer, skipping any previously
// failed download.
func (q *queue) Reserve(p *peer, count int) *fetchRequest {
func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
......@@ -276,6 +311,68 @@ func (q *queue) Reserve(p *peer, count int) *fetchRequest {
return request
}
// Reserve reserves a set of headers for the given peer, skipping any previously
// failed download. Beside the next batch of needed fetches, it also returns a
// flag whether empty blocks were queued requiring processing.
func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
if q.headerQueue.Empty() {
return nil, false, nil
}
if _, ok := q.pendPool[p.id]; ok {
return nil, false, nil
}
// Calculate an upper limit on the bodies we might fetch (i.e. throttling)
space := len(q.blockCache) - len(q.blockPool)
for _, request := range q.pendPool {
space -= len(request.Headers)
}
// Retrieve a batch of headers, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
process := false
for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ {
header := q.headerQueue.PopItem().(*types.Header)
// If the header defines an empty block, deliver straight
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil {
return nil, false, errInvalidChain
}
delete(q.headerPool, header.Hash())
process, space, proc = true, space-1, proc-1
continue
}
// If it's a content block, add to the body fetch request
if p.ignored.Has(header.Hash()) {
skip = append(skip, header)
} else {
send = append(send, header)
}
}
// Merge all the skipped headers back
for _, header := range skip {
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
}
// Assemble and return the block download request
if len(send) == 0 {
return nil, process, nil
}
request := &fetchRequest{
Peer: p,
Headers: send,
Time: time.Now(),
}
q.pendPool[p.id] = request
return request, process, nil
}
// Cancel aborts a fetch request, returning all pending hashes to the queue.
func (q *queue) Cancel(request *fetchRequest) {
q.lock.Lock()
......@@ -284,6 +381,9 @@ func (q *queue) Cancel(request *fetchRequest) {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
}
delete(q.pendPool, request.Peer.id)
}
......@@ -310,8 +410,8 @@ func (q *queue) Expire(timeout time.Duration) []string {
return peers
}
// Deliver injects a block retrieval response into the download queue.
func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
// Deliver61 injects a block retrieval response into the download queue.
func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
q.lock.Lock()
defer q.lock.Unlock()
......@@ -337,19 +437,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
errs = append(errs, fmt.Errorf("non-requested block %x", hash))
continue
}
// If a requested block falls out of the range, the hash chain is invalid
index := int(int64(block.NumberU64()) - int64(q.blockOffset))
if index >= len(q.blockCache) || index < 0 {
return errInvalidChain
}
// Otherwise merge the block and mark the hash block
q.blockCache[index] = &Block{
RawBlock: block,
OriginPeer: id,
// Queue the block up for processing
if err := q.enqueue(id, block); err != nil {
return err
}
delete(request.Hashes, hash)
delete(q.hashPool, hash)
q.blockPool[hash] = block.NumberU64()
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
......@@ -365,6 +458,88 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
return nil
}
// Deliver injects a block body retrieval response into the download queue.
func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the block bodies were never requested
request := q.pendPool[id]
if request == nil {
return errNoFetchesPending
}
delete(q.pendPool, id)
// If no block bodies were retrieved, mark them as unavailable for the origin peer
if len(txLists) == 0 || len(uncleLists) == 0 {
for hash, _ := range request.Headers {
request.Peer.ignored.Add(hash)
}
}
// Assemble each of the block bodies with their headers and queue for processing
errs := make([]error, 0)
for i, header := range request.Headers {
// Short circuit block assembly if no more bodies are found
if i >= len(txLists) || i >= len(uncleLists) {
break
}
// Reconstruct the next block if contents match up
if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash {
errs = []error{errInvalidBody}
break
}
block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i])
// Queue the block up for processing
if err := q.enqueue(id, block); err != nil {
errs = []error{err}
break
}
request.Headers[i] = nil
delete(q.headerPool, header.Hash())
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
}
}
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
return nil
case len(errs) == 1 && errs[0] == errInvalidBody:
return errInvalidBody
case len(errs) == 1 && errs[0] == errInvalidChain:
return errInvalidChain
case len(errs) == len(request.Headers):
return errStaleDelivery
default:
return fmt.Errorf("multiple failures: %v", errs)
}
}
// enqueue inserts a new block into the final delivery queue, waiting for pickup
// by the processor.
func (q *queue) enqueue(origin string, block *types.Block) error {
// If a requested block falls out of the range, the hash chain is invalid
index := int(int64(block.NumberU64()) - int64(q.blockOffset))
if index >= len(q.blockCache) || index < 0 {
return errInvalidChain
}
// Otherwise merge the block and mark the hash done
q.blockCache[index] = &Block{
RawBlock: block,
OriginPeer: origin,
}
q.blockPool[block.Header().Hash()] = block.NumberU64()
return nil
}
// Prepare configures the block cache offset to allow accepting inbound blocks.
func (q *queue) Prepare(offset uint64) {
q.lock.Lock()
......
此差异已折叠。
此差异已折叠。
......@@ -201,7 +201,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil {
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
......@@ -287,7 +289,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}
// Deliver them all to the downloader for queuing
err := pm.downloader.DeliverHashes(p.id, hashes)
err := pm.downloader.DeliverHashes61(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
......@@ -332,8 +334,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
block.ReceivedAt = msg.ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 {
pm.downloader.DeliverBlocks(p.id, blocks)
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
pm.downloader.DeliverBlocks61(p.id, blocks)
}
// Block header query, collect the requested headers and reply
......@@ -401,6 +403,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHeaders(headers)
case p.version >= eth62 && msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
headers = pm.fetcher.FilterHeaders(headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
}
case p.version >= eth62 && msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver them all to the downloader for queuing
trasactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))
for i, body := range request {
trasactions[i] = body.Transactions
uncles[i] = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader
if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
}
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
......@@ -522,7 +564,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks)
if p.version < eth62 {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
} else {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
}
}
case msg.Code == NewBlockMsg:
......@@ -612,7 +658,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash})
if peer.version < eth62 {
peer.SendNewBlockHashes61([]common.Hash{hash})
} else {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}
......
......@@ -145,15 +145,29 @@ func (p *peer) SendBlocks(blocks []*types.Block) error {
return p2p.Send(p.rw, BlocksMsg, blocks)
}
// SendNewBlockHashes announces the availability of a number of blocks through
// SendNewBlockHashes61 announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash) error {
func (p *peer) SendNewBlockHashes61(hashes []common.Hash) error {
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
return p2p.Send(p.rw, NewBlockHashesMsg, hashes)
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
request := make(newBlockHashesData, len(hashes))
for i := 0; i < len(hashes); i++ {
request[i].Hash = hashes[i]
request[i].Number = numbers[i]
}
return p2p.Send(p.rw, NewBlockHashesMsg, request)
}
// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
......@@ -185,40 +199,61 @@ func (p *peer) SendReceipts(receipts []*types.Receipt) error {
// RequestHashes fetches a batch of hashes from a peer, starting at from, going
// towards the genesis block.
func (p *peer) RequestHashes(from common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...\n", p, downloader.MaxHashFetch, from[:4])
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...", p, downloader.MaxHashFetch, from[:4])
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)})
}
// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at
// the requested block number, going upwards towards the genesis block.
func (p *peer) RequestHashesFromNumber(from uint64, count int) error {
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...\n", p, count, from)
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...", p, count, from)
return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)})
}
// RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
func (p *peer) RequestBlocks(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %v blocks\n", p, len(hashes))
glog.V(logger.Debug).Infof("%v fetching %v blocks", p, len(hashes))
return p2p.Send(p.rw, GetBlocksMsg, hashes)
}
// RequestHeaders fetches a batch of blocks' headers corresponding to the
// specified hashes.
func (p *peer) RequestHeaders(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %v headers\n", p, len(hashes))
return p2p.Send(p.rw, GetBlockHeadersMsg, hashes)
// RequestHeaders is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching a single header: %x", p, hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
glog.V(logger.Debug).Infof("%v fetching %d headers from %x, skipping %d (reverse = %v)", p, amount, origin[:4], skip, reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
glog.V(logger.Debug).Infof("%v fetching %d headers from #%d, skipping %d (reverse = %v)", p, amount, origin, skip, reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
func (p *peer) RequestBodies(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %d block bodies", p, len(hashes))
return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
}
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
func (p *peer) RequestNodeData(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %v state data\n", p, len(hashes))
glog.V(logger.Debug).Infof("%v fetching %v state data", p, len(hashes))
return p2p.Send(p.rw, GetNodeDataMsg, hashes)
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *peer) RequestReceipts(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %v receipts\n", p, len(hashes))
glog.V(logger.Debug).Infof("%v fetching %v receipts", p, len(hashes))
return p2p.Send(p.rw, GetReceiptsMsg, hashes)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册