// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package stream import ( "context" "errors" "fmt" "math" "sync" "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) const ( Low uint8 = iota Mid High Top PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top PriorityQueueCap = 4096 // queue capacity HashSize = 32 ) //Enumerate options for syncing and retrieval type SyncingOption int type RetrievalOption int //Syncing options const ( //Syncing disabled SyncingDisabled SyncingOption = iota //Register the client and the server but not subscribe SyncingRegisterOnly //Both client and server funcs are registered, subscribe sent automatically SyncingAutoSubscribe ) const ( //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) RetrievalDisabled RetrievalOption = iota //Only the client side of the retrieve request is registered. //(light nodes do not serve retrieve requests) //once the client is registered, subscription to retrieve request stream is always sent RetrievalClientOnly //Both client and server funcs are registered, subscribe sent automatically RetrievalEnabled ) // Registry registry for outgoing and incoming streamer constructors type Registry struct { addr enode.ID api *API skipCheck bool clientMu sync.RWMutex serverMu sync.RWMutex peersMu sync.RWMutex serverFuncs map[string]func(*Peer, string, bool) (Server, error) clientFuncs map[string]func(*Peer, string, bool) (Client, error) peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool Syncing SyncingOption //Defines syncing behavior Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } //check if retriaval has been disabled retrieval := options.Retrieval != RetrievalDisabled streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { return nil, errors.New("only live retrieval requests supported") } return NewSwarmChunkServer(delivery.chunkStore), nil }) } //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) if options.Retrieval != RetrievalDisabled { streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) } //If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } //if syncing is set to automatically subscribe to the syncing stream, start the subscription process if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop // - the latest int value is delivered to the loop after the processing is done // In context of NeighbourhoodDepthC: // after the syncing is done updating inside the loop, we do not need to update on the intermediate // depth changes, only to the latest one latestIntC := func(in <-chan int) <-chan int { out := make(chan int, 1) go func() { defer close(out) for i := range in { select { case <-out: default: } out <- i } }() return out } go func() { // wait for kademlia table to be healthy time.Sleep(options.SyncUpdateDelay) kad := streamer.delivery.kad depthC := latestIntC(kad.NeighbourhoodDepthC()) addressBookSizeC := latestIntC(kad.AddrCountC()) // initial requests for syncing subscription to peers streamer.updateSyncing() for depth := range depthC { log.Debug("Kademlia neighbourhood depth change", "depth", depth) // Prevent too early sync subscriptions by waiting until there are no // new peers connecting. Sync streams updating will be done after no // peers are connected for at least SyncUpdateDelay period. timer := time.NewTimer(options.SyncUpdateDelay) // Hard limit to sync update delay, preventing long delays // on a very dynamic network maxTimer := time.NewTimer(3 * time.Minute) loop: for { select { case <-maxTimer.C: // force syncing update when a hard timeout is reached log.Trace("Sync subscriptions update on hard timeout") // request for syncing subscription to new peers streamer.updateSyncing() break loop case <-timer.C: // start syncing as no new peers has been added to kademlia // for some time log.Trace("Sync subscriptions update") // request for syncing subscription to new peers streamer.updateSyncing() break loop case size := <-addressBookSizeC: log.Trace("Kademlia address book size changed on depth change", "size", size) // new peers has been added to kademlia, // reset the timer to prevent early sync subscriptions if !timer.Stop() { <-timer.C } timer.Reset(options.SyncUpdateDelay) } } timer.Stop() maxTimer.Stop() } }() } return streamer } // RegisterClient registers an incoming streamer constructor func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) { r.clientMu.Lock() defer r.clientMu.Unlock() r.clientFuncs[stream] = f } // RegisterServer registers an outgoing streamer constructor func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error)) { r.serverMu.Lock() defer r.serverMu.Unlock() r.serverFuncs[stream] = f } // GetClient accessor for incoming streamer constructors func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error) { r.clientMu.RLock() defer r.clientMu.RUnlock() f := r.clientFuncs[stream] if f == nil { return nil, fmt.Errorf("stream %v not registered", stream) } return f, nil } // GetServer accessor for incoming streamer constructors func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error) { r.serverMu.RLock() defer r.serverMu.RUnlock() f := r.serverFuncs[stream] if f == nil { return nil, fmt.Errorf("stream %v not registered", stream) } return f, nil } func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error { // check if the stream is registered if _, err := r.GetServerFunc(s.Name); err != nil { return err } peer := r.getPeer(peerId) if peer == nil { return fmt.Errorf("peer not found %v", peerId) } if _, err := peer.getServer(s); err != nil { if e, ok := err.(*notFoundError); ok && e.t == "server" { // request subscription only if the server for this stream is not created log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h) return peer.Send(context.TODO(), &RequestSubscriptionMsg{ Stream: s, History: h, Priority: prio, }) } return err } log.Trace("RequestSubscription: already subscribed", "peer", peerId, "stream", s, "history", h) return nil } // Subscribe initiates the streamer func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error { // check if the stream is registered if _, err := r.GetClientFunc(s.Name); err != nil { return err } peer := r.getPeer(peerId) if peer == nil { return fmt.Errorf("peer not found %v", peerId) } var to uint64 if !s.Live && h != nil { to = h.To } err := peer.setClientParams(s, newClientParams(priority, to)) if err != nil { return err } if s.Live && h != nil { if err := peer.setClientParams( getHistoryStream(s), newClientParams(getHistoryPriority(priority), h.To), ); err != nil { return err } } msg := &SubscribeMsg{ Stream: s, History: h, Priority: priority, } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) return peer.SendPriority(context.TODO(), msg, priority) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { peer := r.getPeer(peerId) if peer == nil { return fmt.Errorf("peer not found %v", peerId) } msg := &UnsubscribeMsg{ Stream: s, } log.Debug("Unsubscribe ", "peer", peerId, "stream", s) if err := peer.Send(context.TODO(), msg); err != nil { return err } return peer.removeClient(s) } // Quit sends the QuitMsg to the peer to remove the // stream peer client and terminate the streaming. func (r *Registry) Quit(peerId enode.ID, s Stream) error { peer := r.getPeer(peerId) if peer == nil { log.Debug("stream quit: peer not found", "peer", peerId, "stream", s) // if the peer is not found, abort the request return nil } msg := &QuitMsg{ Stream: s, } log.Debug("Quit ", "peer", peerId, "stream", s) return peer.Send(context.TODO(), msg) } func (r *Registry) NodeInfo() interface{} { return nil } func (r *Registry) PeerInfo(id enode.ID) interface{} { return nil } func (r *Registry) Close() error { return r.intervalsStore.Close() } func (r *Registry) getPeer(peerId enode.ID) *Peer { r.peersMu.RLock() defer r.peersMu.RUnlock() return r.peers[peerId] } func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() r.peers[peer.ID()] = peer metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } func (r *Registry) deletePeer(peer *Peer) { r.peersMu.Lock() delete(r.peers, peer.ID()) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } func (r *Registry) peersCount() (c int) { r.peersMu.Lock() c = len(r.peers) r.peersMu.Unlock() return } // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { sp := NewPeer(p.Peer, r) r.setPeer(sp) defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() if r.autoRetrieval && !p.LightNode { err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err } } return sp.Run(sp.HandleMsg) } // updateSyncing subscribes to SYNC streams by iterating over the // kademlia connections and bins. If there are existing SYNC streams // and they are no longer required after iteration, request to Quit // them will be send to appropriate peers. func (r *Registry) updateSyncing() { kad := r.delivery.kad // map of all SYNC streams for all peers // used at the and of the function to remove servers // that are not needed anymore subs := make(map[enode.ID]map[Stream]struct{}) r.peersMu.RLock() for id, peer := range r.peers { peer.serverMu.RLock() for stream := range peer.servers { if stream.Name == "SYNC" { if _, ok := subs[id]; !ok { subs[id] = make(map[Stream]struct{}) } subs[id][stream] = struct{}{} } } peer.serverMu.RUnlock() } r.peersMu.RUnlock() // request subscriptions for all nodes and bins kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin)) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true) if streams, ok := subs[p.ID()]; ok { // delete live and history streams from the map, so that it won't be removed with a Quit request delete(streams, stream) delete(streams, getHistoryStream(stream)) } err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High) if err != nil { log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream) return false } return true }) // remove SYNC servers that do not need to be subscribed for id, streams := range subs { if len(streams) == 0 { continue } peer := r.getPeer(id) if peer == nil { continue } for stream := range streams { log.Debug("Remove sync server", "peer", id, "stream", stream) err := r.Quit(peer.ID(), stream) if err != nil && err != p2p.ErrShuttingDown { log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream) } } } } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := protocols.NewPeer(p, rw, Spec) bp := network.NewBzzPeer(peer) np := network.NewPeer(bp, r.delivery.kad) r.delivery.kad.On(np) defer r.delivery.kad.Off(np) return r.Run(bp) } // HandleMsg is the message handler that delegates incoming messages func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *SubscribeMsg: return p.handleSubscribeMsg(ctx, msg) case *SubscribeErrorMsg: return p.handleSubscribeErrorMsg(msg) case *UnsubscribeMsg: return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: return p.handleOfferedHashesMsg(ctx, msg) case *TakeoverProofMsg: return p.handleTakeoverProofMsg(ctx, msg) case *WantedHashesMsg: return p.handleWantedHashesMsg(ctx, msg) case *ChunkDeliveryMsgRetrieval: //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *ChunkDeliveryMsgSyncing: //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *RetrieveRequestMsg: return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) case *QuitMsg: return p.handleQuitMsg(msg) default: return fmt.Errorf("unknown message type: %T", msg) } } type server struct { Server stream Stream priority uint8 currentBatch []byte sessionIndex uint64 } // setNextBatch adjusts passed interval based on session index and whether // stream is live or history. It calls Server SetNextBatch with adjusted // interval and returns batch hashes and their interval. func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { if s.stream.Live { if from == 0 { from = s.sessionIndex } if to <= from || from >= s.sessionIndex { to = math.MaxUint64 } } else { if (to < from && to != 0) || from > s.sessionIndex { return nil, 0, 0, nil, nil } if to == 0 || to > s.sessionIndex { to = s.sessionIndex } } return s.SetNextBatch(from, to) } // Server interface for outgoing peer Streamer type Server interface { // SessionIndex is called when a server is initialized // to get the current cursor state of the stream data. // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() } type client struct { Client stream Stream priority uint8 sessionAt uint64 to uint64 next chan error quit chan struct{} intervalsKey string intervalsStore state.Store } func peerStreamIntervalsKey(p *Peer, s Stream) string { return p.ID().String() + s.String() } func (c client) AddInterval(start, end uint64) (err error) { i := &intervals.Intervals{} err = c.intervalsStore.Get(c.intervalsKey, i) if err != nil { return err } i.Add(start, end) return c.intervalsStore.Put(c.intervalsKey, i) } func (c client) NextInterval() (start, end uint64, err error) { i := &intervals.Intervals{} err = c.intervalsStore.Get(c.intervalsKey, i) if err != nil { return 0, 0, err } start, end = i.Next() return start, end, nil } // Client interface for incoming peer Streamer type Client interface { NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) { if c.to > 0 && from >= c.to { return 0, 0 } if c.stream.Live { return from, 0 } else if from >= c.sessionAt { if c.to > 0 { return from, c.to } return from, math.MaxUint64 } nextFrom, nextTo, err := c.NextInterval() if err != nil { log.Error("next intervals", "stream", c.stream) return } if nextTo > c.to { nextTo = c.to } if nextTo == 0 { nextTo = c.sessionAt } return } func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error { if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil { tp, err := tf() if err != nil { return err } if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream) } return nil } // TODO: make a test case for testing if the interval is added when the batch is done if err := c.AddInterval(req.From, req.To); err != nil { return err } return nil } func (c *client) close() { select { case <-c.quit: default: close(c.quit) } c.Close() } // clientParams store parameters for the new client // between a subscription and initial offered hashes request handling. type clientParams struct { priority uint8 to uint64 // signal when the client is created clientCreatedC chan struct{} } func newClientParams(priority uint8, to uint64) *clientParams { return &clientParams{ priority: priority, to: to, clientCreatedC: make(chan struct{}), } } func (c *clientParams) waitClient(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() case <-c.clientCreatedC: return nil } } func (c *clientParams) clientCreated() { close(c.clientCreatedC) } // Spec is the spec of the streamer protocol var Spec = &protocols.Spec{ Name: "stream", Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, OfferedHashesMsg{}, WantedHashesMsg{}, TakeoverProofMsg{}, SubscribeMsg{}, RetrieveRequestMsg{}, ChunkDeliveryMsgRetrieval{}, SubscribeErrorMsg{}, RequestSubscriptionMsg{}, QuitMsg{}, ChunkDeliveryMsgSyncing{}, }, } func (r *Registry) Protocols() []p2p.Protocol { return []p2p.Protocol{ { Name: Spec.Name, Version: Spec.Version, Length: Spec.Length(), Run: r.runProtocol, // NodeInfo: , // PeerInfo: , }, } } func (r *Registry) APIs() []rpc.API { return []rpc.API{ { Namespace: "stream", Version: "3.0", Service: r.api, Public: true, }, } } func (r *Registry) Start(server *p2p.Server) error { log.Info("Streamer started") return nil } func (r *Registry) Stop() error { return nil } type Range struct { From, To uint64 } func NewRange(from, to uint64) *Range { return &Range{ From: from, To: to, } } func (r *Range) String() string { return fmt.Sprintf("%v-%v", r.From, r.To) } func getHistoryPriority(priority uint8) uint8 { if priority == 0 { return 0 } return priority - 1 } func getHistoryStream(s Stream) Stream { return NewStream(s.Name, s.Key, false) } type API struct { streamer *Registry } func NewAPI(r *Registry) *API { return &API{ streamer: r, } } func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error { return api.streamer.Subscribe(peerId, s, history, priority) } func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { return api.streamer.Unsubscribe(peerId, s) }