未验证 提交 068725c5 编写于 作者: L lash 提交者: Rafael Matias

swarm/network, swarm/storage: Preserve opentracing contexts (#19022)

(cherry picked from commit 0c10d376)
上级 710775f4
...@@ -52,6 +52,7 @@ type Fetcher struct { ...@@ -52,6 +52,7 @@ type Fetcher struct {
requestC chan uint8 // channel for incoming requests (with the hopCount value in it) requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration searchTimeout time.Duration
skipCheck bool skipCheck bool
ctx context.Context
} }
type Request struct { type Request struct {
...@@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { ...@@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
// contain the peers which are actively requesting this chunk, to make sure we // contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them. // don't request back the chunks from them.
// The created Fetcher is started and returned. // The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(source, f.request, f.skipCheck) fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
go fetcher.run(ctx, peersToSkip) go fetcher.run(peers)
return fetcher return fetcher
} }
// NewFetcher creates a new Fetcher for the given chunk address using the given request function. // NewFetcher creates a new Fetcher for the given chunk address using the given request function.
func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{ return &Fetcher{
addr: addr, addr: addr,
protoRequestFunc: rf, protoRequestFunc: rf,
...@@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { ...@@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
requestC: make(chan uint8), requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout, searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck, skipCheck: skipCheck,
ctx: ctx,
} }
} }
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. // Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done // First we need to have this select to make sure that we return if context is done
select { select {
case <-ctx.Done(): case <-f.ctx.Done():
return return
default: default:
} }
...@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { ...@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select { select {
case f.offerC <- source: case f.offerC <- source:
case <-ctx.Done(): case <-f.ctx.Done():
} }
} }
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. // Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done // First we need to have this select to make sure that we return if context is done
select { select {
case <-ctx.Done(): case <-f.ctx.Done():
return return
default: default:
} }
...@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { ...@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select { select {
case f.requestC <- hopCount + 1: case f.requestC <- hopCount + 1:
case <-ctx.Done(): case <-f.ctx.Done():
} }
} }
// start prepares the Fetcher // start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context // it keeps the Fetcher alive within the lifecycle of the passed context
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { func (f *Fetcher) run(peers *sync.Map) {
var ( var (
doRequest bool // determines if retrieval is initiated in the current iteration doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout wait *time.Timer // timer for search timeout
...@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { ...@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested doRequest = requested
// all Fetcher context closed, can quit // all Fetcher context closed, can quit
case <-ctx.Done(): case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr) log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from) // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return return
...@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { ...@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request // need to issue a new request
if doRequest { if doRequest {
var err error var err error
sources, err = f.doRequest(ctx, gone, peers, sources, hopCount) sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil { if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err) log.Info("unable to request", "request addr", f.addr, "err", err)
} }
...@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { ...@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip // * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and // * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) // * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int var i int
var sourceID *enode.ID var sourceID *enode.ID
var quit chan struct{} var quit chan struct{}
...@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki ...@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ { for i = 0; i < len(sources); i++ {
req.Source = sources[i] req.Source = sources[i]
var err error var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req) sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil { if err == nil {
// remove the peer from known sources // remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately // Note: we can modify the source although we are looping on it, because we break from the loop immediately
...@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki ...@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource { if !foundSource {
req.Source = nil req.Source = nil
var err error var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req) sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil { if err != nil {
// if no peers found to request from // if no peers found to request from
return sources, err return sources, err
...@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki ...@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select { select {
case <-quit: case <-quit:
gone <- sourceID gone <- sourceID
case <-ctx.Done(): case <-f.ctx.Done():
} }
}() }()
return sources, nil return sources, nil
......
...@@ -69,7 +69,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode ...@@ -69,7 +69,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
func TestFetcherSingleRequest(t *testing.T) { func TestFetcherSingleRequest(t *testing.T) {
requester := newMockRequester() requester := newMockRequester()
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peers := []string{"a", "b", "c", "d"} peers := []string{"a", "b", "c", "d"}
peersToSkip := &sync.Map{} peersToSkip := &sync.Map{}
...@@ -77,13 +81,9 @@ func TestFetcherSingleRequest(t *testing.T) { ...@@ -77,13 +81,9 @@ func TestFetcherSingleRequest(t *testing.T) {
peersToSkip.Store(p, time.Now()) peersToSkip.Store(p, time.Now())
} }
ctx, cancel := context.WithCancel(context.Background()) go fetcher.run(peersToSkip)
defer cancel()
go fetcher.run(ctx, peersToSkip)
rctx := context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
select { select {
case request := <-requester.requestC: case request := <-requester.requestC:
...@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) { ...@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
func TestFetcherCancelStopsFetcher(t *testing.T) { func TestFetcherCancelStopsFetcher(t *testing.T) {
requester := newMockRequester() requester := newMockRequester()
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
// we start the fetcher, and then we immediately cancel the context // we start the fetcher, and then we immediately cancel the context
go fetcher.run(ctx, peersToSkip) go fetcher.run(peersToSkip)
cancel() cancel()
rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer rcancel()
// we call Request with an active context // we call Request with an active context
fetcher.Request(rctx, 0) fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select { select {
...@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) { ...@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request // TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
func TestFetcherCancelStopsRequest(t *testing.T) { func TestFetcherCancelStopsRequest(t *testing.T) {
t.Skip("since context is now per fetcher, this test is likely redundant")
requester := newMockRequester(100 * time.Millisecond) requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// we start the fetcher with an active context fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
go fetcher.run(ctx, peersToSkip)
rctx, rcancel := context.WithCancel(context.Background()) peersToSkip := &sync.Map{}
rcancel()
// we start the fetcher with an active context
go fetcher.run(peersToSkip)
// we call Request with a cancelled context // we call Request with a cancelled context
fetcher.Request(rctx, 0) fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select { select {
...@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) { ...@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
} }
// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
rctx = context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
select { select {
case <-requester.requestC: case <-requester.requestC:
...@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) { ...@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
func TestFetcherOfferUsesSource(t *testing.T) { func TestFetcherOfferUsesSource(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond) requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
// start the fetcher // start the fetcher
go fetcher.run(ctx, peersToSkip) go fetcher.run(peersToSkip)
rctx := context.Background()
// call the Offer function with the source peer // call the Offer function with the source peer
fetcher.Offer(rctx, &sourcePeerID) fetcher.Offer(&sourcePeerID)
// fetcher should not initiate request // fetcher should not initiate request
select { select {
...@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) { ...@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
} }
// call Request after the Offer // call Request after the Offer
rctx = context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
// there should be exactly 1 request coming from fetcher // there should be exactly 1 request coming from fetcher
var request *Request var request *Request
...@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) { ...@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond) requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peersToSkip := &sync.Map{}
// start the fetcher // start the fetcher
go fetcher.run(ctx, peersToSkip) go fetcher.run(peersToSkip)
// call Request first // call Request first
rctx := context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
// there should be a request coming from fetcher // there should be a request coming from fetcher
var request *Request var request *Request
...@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { ...@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
} }
// after the Request call Offer // after the Request call Offer
fetcher.Offer(context.Background(), &sourcePeerID) fetcher.Offer(&sourcePeerID)
// there should be a request coming from fetcher // there should be a request coming from fetcher
select { select {
...@@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { ...@@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
func TestFetcherRetryOnTimeout(t *testing.T) { func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester() requester := newMockRequester()
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker // set searchTimeOut to low value so the test is quicker
fetcher.searchTimeout = 250 * time.Millisecond fetcher.searchTimeout = 250 * time.Millisecond
peersToSkip := &sync.Map{} peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start the fetcher // start the fetcher
go fetcher.run(ctx, peersToSkip) go fetcher.run(peersToSkip)
// call the fetch function with an active context // call the fetch function with an active context
rctx := context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
// after 100ms the first request should be initiated // after 100ms the first request should be initiated
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
...@@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) { ...@@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {
fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
fetcher.Request(context.Background(), 0) fetcher.Request(0)
// check if the created fetchFunction really starts a fetcher and initiates a request // check if the created fetchFunction really starts a fetcher and initiates a request
select { select {
...@@ -353,7 +350,11 @@ func TestFetcherFactory(t *testing.T) { ...@@ -353,7 +350,11 @@ func TestFetcherFactory(t *testing.T) {
func TestFetcherRequestQuitRetriesRequest(t *testing.T) { func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
requester := newMockRequester() requester := newMockRequester()
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// make sure the searchTimeout is long so it is sure the request is not // make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout // retried because of timeout
...@@ -361,13 +362,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) { ...@@ -361,13 +362,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
peersToSkip := &sync.Map{} peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background()) go fetcher.run(peersToSkip)
defer cancel()
go fetcher.run(ctx, peersToSkip)
rctx := context.Background() fetcher.Request(0)
fetcher.Request(rctx, 0)
select { select {
case <-requester.requestC: case <-requester.requestC:
...@@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) { ...@@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
func TestFetcherMaxHopCount(t *testing.T) { func TestFetcherMaxHopCount(t *testing.T) {
requester := newMockRequester() requester := newMockRequester()
addr := make([]byte, 32) addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
peersToSkip := &sync.Map{} fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
go fetcher.run(ctx, peersToSkip) peersToSkip := &sync.Map{}
rctx := context.Background() go fetcher.run(peersToSkip)
fetcher.Request(rctx, maxHopCount)
// if hopCount is already at max no request should be initiated // if hopCount is already at max no request should be initiated
select { select {
......
...@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * ...@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
ctx, osp = spancontext.StartSpan( ctx, osp = spancontext.StartSpan(
ctx, ctx,
"retrieve.request") "retrieve.request")
defer osp.Finish()
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil { if err != nil {
...@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * ...@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
}() }()
go func() { go func() {
defer osp.Finish()
chunk, err := d.chunkStore.Get(ctx, req.Addr) chunk, err := d.chunkStore.Get(ctx, req.Addr)
if err != nil { if err != nil {
retrieveChunkFail.Inc(1) retrieveChunkFail.Inc(1)
...@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch ...@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
ctx, osp = spancontext.StartSpan( ctx, osp = spancontext.StartSpan(
ctx, ctx,
"chunk.delivery") "chunk.delivery")
defer osp.Finish()
processReceivedChunksCount.Inc(1) processReceivedChunksCount.Inc(1)
go func() { go func() {
defer osp.Finish()
req.peer = sp req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil { if err != nil {
...@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( ...@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
Addr: req.Addr, Addr: req.Addr,
SkipCheck: req.SkipCheck, SkipCheck: req.SkipCheck,
HopCount: req.HopCount, HopCount: req.HopCount,
}, Top) }, Top, "request.from.peers")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
......
...@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg ...@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return return
} }
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority) err := p.SendPriority(ctx, msg, c.priority, "")
if err != nil { if err != nil {
log.Warn("SendPriority error", "err", err) log.Warn("SendPriority error", "err", err)
} }
......
...@@ -65,6 +65,7 @@ type Peer struct { ...@@ -65,6 +65,7 @@ type Peer struct {
// on creating a new client in offered hashes handler. // on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams clientParams map[Stream]*clientParams
quit chan struct{} quit chan struct{}
spans sync.Map
} }
type WrappedPriorityMsg struct { type WrappedPriorityMsg struct {
...@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ...@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client), clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams), clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}), quit: make(chan struct{}),
spans: sync.Map{},
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) { go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg) wmsg := i.(WrappedPriorityMsg)
defer p.spans.Delete(wmsg.Context)
sp, ok := p.spans.Load(wmsg.Context)
if ok {
defer sp.(opentracing.Span).Finish()
}
err := p.Send(wmsg.Context, wmsg.Msg) err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil { if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
...@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ...@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
// Deliver sends a storeRequestMsg protocol message to the peer // Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types // Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
var sp opentracing.Span
var msg interface{} var msg interface{}
spanName := "send.chunk.delivery" spanName := "send.chunk.delivery"
...@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, ...@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
} }
spanName += ".retrieval" spanName += ".retrieval"
} }
ctx, sp = spancontext.StartSpan(
ctx,
spanName)
defer sp.Finish()
return p.SendPriority(ctx, msg, priority) return p.SendPriority(ctx, msg, priority, spanName)
} }
// SendPriority sends message to the peer using the outgoing priority queue // SendPriority sends message to the peer using the outgoing priority queue
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
if traceId != "" {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceId,
)
p.spans.Store(ctx, sp)
}
wmsg := WrappedPriorityMsg{ wmsg := WrappedPriorityMsg{
Context: ctx, Context: ctx,
Msg: msg, Msg: msg,
...@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { ...@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream, Stream: s.stream,
} }
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
return p.SendPriority(ctx, msg, s.priority) return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
} }
func (p *Peer) getServer(s Stream) (*server, error) { func (p *Peer) getServer(s Stream) (*server, error) {
......
...@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 ...@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
} }
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
return peer.SendPriority(context.TODO(), msg, priority) return peer.SendPriority(context.TODO(), msg, priority, "")
} }
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
...@@ -730,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error ...@@ -730,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil { if err != nil {
return err return err
} }
if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
return err return err
} }
if c.to > 0 && tp.Takeover.End >= c.to { if c.to > 0 && tp.Takeover.End >= c.to {
......
...@@ -465,7 +465,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { ...@@ -465,7 +465,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
length *= r.chunkSize length *= r.chunkSize
} }
wg.Add(1) wg.Add(1)
go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
go func() { go func() {
wg.Wait() wg.Wait()
close(errC) close(errC)
...@@ -485,7 +485,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { ...@@ -485,7 +485,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
return len(b), nil return len(b), nil
} }
func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
defer parentWg.Done() defer parentWg.Done()
// find appropriate block level // find appropriate block level
for chunkData.Size() < uint64(treeSize) && depth > r.depth { for chunkData.Size() < uint64(treeSize) && depth > r.depth {
...@@ -533,7 +533,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS ...@@ -533,7 +533,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
go func(j int64) { go func(j int64) {
childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
startTime := time.Now() startTime := time.Now()
chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) chunkData, err := r.getter.Get(ctx, Reference(childAddress))
if err != nil { if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
...@@ -554,7 +554,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS ...@@ -554,7 +554,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
if soff < off { if soff < off {
soff = off soff = off
} }
r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
}(i) }(i)
} //for } //for
} }
...@@ -581,6 +581,11 @@ var errWhence = errors.New("Seek: invalid whence") ...@@ -581,6 +581,11 @@ var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset") var errOffset = errors.New("Seek: invalid offset")
func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
cctx, sp := spancontext.StartSpan(
r.ctx,
"lcr.seek")
defer sp.Finish()
log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset) log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset)
switch whence { switch whence {
default: default:
...@@ -590,8 +595,9 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { ...@@ -590,8 +595,9 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
case 1: case 1:
offset += r.off offset += r.off
case 2: case 2:
if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first
_, err := r.Size(context.TODO(), nil) _, err := r.Size(cctx, nil)
if err != nil { if err != nil {
return 0, fmt.Errorf("can't get size: %v", err) return 0, fmt.Errorf("can't get size: %v", err)
} }
......
...@@ -40,9 +40,9 @@ func (t *TestHandler) Close() { ...@@ -40,9 +40,9 @@ func (t *TestHandler) Close() {
type mockNetFetcher struct{} type mockNetFetcher struct{}
func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) { func (m *mockNetFetcher) Request(hopCount uint8) {
} }
func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) { func (m *mockNetFetcher) Offer(source *enode.ID) {
} }
func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher {
......
...@@ -34,8 +34,8 @@ type ( ...@@ -34,8 +34,8 @@ type (
) )
type NetFetcher interface { type NetFetcher interface {
Request(ctx context.Context, hopCount uint8) Request(hopCount uint8)
Offer(ctx context.Context, source *enode.ID) Offer(source *enode.ID)
} }
// NetStore is an extension of local storage // NetStore is an extension of local storage
...@@ -150,7 +150,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co ...@@ -150,7 +150,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
} }
// The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
// if it doesn't exist yet // if it doesn't exist yet
f := n.getOrCreateFetcher(ref) f := n.getOrCreateFetcher(ctx, ref)
// If the caller needs the chunk, it has to use the returned fetch function to get it // If the caller needs the chunk, it has to use the returned fetch function to get it
return nil, f.Fetch, nil return nil, f.Fetch, nil
} }
...@@ -168,7 +168,7 @@ func (n *NetStore) Has(ctx context.Context, ref Address) bool { ...@@ -168,7 +168,7 @@ func (n *NetStore) Has(ctx context.Context, ref Address) bool {
// getOrCreateFetcher attempts at retrieving an existing fetchers // getOrCreateFetcher attempts at retrieving an existing fetchers
// if none exists, creates one and saves it in the fetchers cache // if none exists, creates one and saves it in the fetchers cache
// caller must hold the lock // caller must hold the lock
func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
if f := n.getFetcher(ref); f != nil { if f := n.getFetcher(ref); f != nil {
return f return f
} }
...@@ -176,7 +176,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { ...@@ -176,7 +176,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// no fetcher for the given address, we have to create a new one // no fetcher for the given address, we have to create a new one
key := hex.EncodeToString(ref) key := hex.EncodeToString(ref)
// create the context during which fetching is kept alive // create the context during which fetching is kept alive
ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout) cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
// destroy is called when all requests finish // destroy is called when all requests finish
destroy := func() { destroy := func() {
// remove fetcher from fetchers // remove fetcher from fetchers
...@@ -190,7 +190,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { ...@@ -190,7 +190,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// the peers which requested the chunk should not be requested to deliver it. // the peers which requested the chunk should not be requested to deliver it.
peers := &sync.Map{} peers := &sync.Map{}
fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
n.fetchers.Add(key, fetcher) n.fetchers.Add(key, fetcher)
return fetcher return fetcher
...@@ -278,9 +278,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { ...@@ -278,9 +278,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil { if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
return nil, err return nil, err
} }
f.netFetcher.Offer(rctx, &source) f.netFetcher.Offer(&source)
} else { } else {
f.netFetcher.Request(rctx, hopCount) f.netFetcher.Request(hopCount)
} }
// wait until either the chunk is delivered or the context is done // wait until either the chunk is delivered or the context is done
......
...@@ -46,12 +46,12 @@ type mockNetFetcher struct { ...@@ -46,12 +46,12 @@ type mockNetFetcher struct {
mu sync.Mutex mu sync.Mutex
} }
func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) { func (m *mockNetFetcher) Offer(source *enode.ID) {
m.offerCalled = true m.offerCalled = true
m.sources = append(m.sources, source) m.sources = append(m.sources, source)
} }
func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) { func (m *mockNetFetcher) Request(hopCount uint8) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册