diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 0429c4dffcfc40e415c902a7caefe7f969b02f9c..9092ffe3ed47acafd500c7ff0fe2ff233c741c95 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -173,7 +173,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return } if req.SkipCheck { - err = sp.Deliver(ctx, chunk, s.priority) + syncing := false + err = sp.Deliver(ctx, chunk, s.priority, syncing) if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } @@ -189,12 +190,22 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return nil } +//Chunk delivery always uses the same message type.... type ChunkDeliveryMsg struct { Addr storage.Address SData []byte // the stored chunk Data (incl size) peer *Peer // set in handleChunkDeliveryMsg } +//...but swap accounting needs to disambiguate if it is a delivery for syncing or for retrieval +//as it decides based on message type if it needs to account for this message or not + +//defines a chunk delivery for retrieval (with accounting) +type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg + +//defines a chunk delivery for syncing (without accounting) +type ChunkDeliveryMsgSyncing ChunkDeliveryMsg + // TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 0fe3e5eb4f7b73463edfff0e8ca912230145899f..eb1b2983e1234310b1e50e19e9b7a0c67260e4ae 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -357,7 +357,8 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } chunk := storage.NewChunk(hash, data) - if err := p.Deliver(ctx, chunk, s.priority); err != nil { + syncing := true + if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil { return err } } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 89d135ad5237c1cad4ab500fae76fba81fcc5d0b..4bccf56f5dd71184a46e86b9b25c75216ca29489 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -128,17 +128,34 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { +// Depending on the `syncing` parameter we send different message types +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { var sp opentracing.Span + var msg interface{} + + spanName := "send.chunk.delivery" + + //we send different types of messages if delivery is for syncing or retrievals, + //even if handling and content of the message are the same, + //because swap accounting decides which messages need accounting based on the message type + if syncing { + msg = &ChunkDeliveryMsgSyncing{ + Addr: chunk.Address(), + SData: chunk.Data(), + } + spanName += ".syncing" + } else { + msg = &ChunkDeliveryMsgRetrieval{ + Addr: chunk.Address(), + SData: chunk.Data(), + } + spanName += ".retrieval" + } ctx, sp = spancontext.StartSpan( ctx, - "send.chunk.delivery") + spanName) defer sp.Finish() - msg := &ChunkDeliveryMsg{ - Addr: chunk.Address(), - SData: chunk.Data(), - } return p.SendPriority(ctx, msg, priority) } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 9d0e6c68b8190e521cad27a74d8f9dde3a4712a0..0ac374def12f648d443c1c005a728e7fa2f3972c 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -489,8 +489,13 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { case *WantedHashesMsg: return p.handleWantedHashesMsg(ctx, msg) - case *ChunkDeliveryMsg: - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) + case *ChunkDeliveryMsgRetrieval: + //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + + case *ChunkDeliveryMsgSyncing: + //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *RetrieveRequestMsg: return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) @@ -681,7 +686,7 @@ func (c *clientParams) clientCreated() { // Spec is the spec of the streamer protocol var Spec = &protocols.Spec{ Name: "stream", - Version: 7, + Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, @@ -690,10 +695,11 @@ var Spec = &protocols.Spec{ TakeoverProofMsg{}, SubscribeMsg{}, RetrieveRequestMsg{}, - ChunkDeliveryMsg{}, + ChunkDeliveryMsgRetrieval{}, SubscribeErrorMsg{}, RequestSubscriptionMsg{}, QuitMsg{}, + ChunkDeliveryMsgSyncing{}, }, }