snapshot_sync_test.go 18.9 KB
Newer Older
E
ethersphere 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package stream

import (
	"context"
	"fmt"
	"os"
B
Balint Gabor 已提交
22
	"runtime"
E
ethersphere 已提交
23 24 25 26 27 28
	"sync"
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"
29
	"github.com/ethereum/go-ethereum/node"
30
	"github.com/ethereum/go-ethereum/p2p/enode"
31
	"github.com/ethereum/go-ethereum/p2p/simulations"
E
ethersphere 已提交
32 33
	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
	"github.com/ethereum/go-ethereum/swarm/network"
34
	"github.com/ethereum/go-ethereum/swarm/network/simulation"
E
ethersphere 已提交
35
	"github.com/ethereum/go-ethereum/swarm/pot"
36
	"github.com/ethereum/go-ethereum/swarm/state"
E
ethersphere 已提交
37
	"github.com/ethereum/go-ethereum/swarm/storage"
38
	mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
39
	"github.com/ethereum/go-ethereum/swarm/testutil"
E
ethersphere 已提交
40 41 42 43 44
)

const MaxTimeout = 600

type synctestConfig struct {
B
Balint Gabor 已提交
45 46
	addrs         [][]byte
	hashes        []storage.Address
47
	idToChunksMap map[enode.ID][]int
B
Balint Gabor 已提交
48
	//chunksToNodesMap map[string][]int
49
	addrToIDMap map[string]enode.ID
B
Balint Gabor 已提交
50 51
}

52 53 54 55 56 57 58 59 60 61 62
const (
	// EventTypeNode is the type of event emitted when a node is either
	// created, started or stopped
	EventTypeChunkCreated   simulations.EventType = "chunkCreated"
	EventTypeChunkOffered   simulations.EventType = "chunkOffered"
	EventTypeChunkWanted    simulations.EventType = "chunkWanted"
	EventTypeChunkDelivered simulations.EventType = "chunkDelivered"
	EventTypeChunkArrived   simulations.EventType = "chunkArrived"
	EventTypeSimTerminated  simulations.EventType = "simTerminated"
)

B
Balint Gabor 已提交
63 64
// Tests in this file should not request chunks from peers.
// This function will panic indicating that there is a problem if request has been made.
65
func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
B
Balint Gabor 已提交
66
	panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
E
ethersphere 已提交
67 68 69 70 71 72 73 74 75
}

//This test is a syncing test for nodes.
//One node is randomly selected to be the pivot node.
//A configurable number of chunks and nodes can be
//provided to the test, the number of chunks is uploaded
//to the pivot node, and we check that nodes get the chunks
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
76
func TestSyncingViaGlobalSync(t *testing.T) {
B
Balint Gabor 已提交
77 78 79
	if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
		t.Skip("Flaky on mac on travis")
	}
E
ethersphere 已提交
80 81 82 83
	//if nodes/chunks have been provided via commandline,
	//run the tests with these values
	if *nodes != 0 && *chunks != 0 {
		log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
84
		testSyncingViaGlobalSync(t, *chunks, *nodes)
E
ethersphere 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
	} else {
		var nodeCnt []int
		var chnkCnt []int
		//if the `longrunning` flag has been provided
		//run more test combinations
		if *longrunning {
			chnkCnt = []int{1, 8, 32, 256, 1024}
			nodeCnt = []int{16, 32, 64, 128, 256}
		} else {
			//default test
			chnkCnt = []int{4, 32}
			nodeCnt = []int{32, 16}
		}
		for _, chnk := range chnkCnt {
			for _, n := range nodeCnt {
				log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
101
				testSyncingViaGlobalSync(t, chnk, n)
E
ethersphere 已提交
102 103 104 105 106
			}
		}
	}
}

107
func TestSyncingViaDirectSubscribe(t *testing.T) {
B
Balint Gabor 已提交
108 109 110
	if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
		t.Skip("Flaky on mac on travis")
	}
111 112 113 114
	//if nodes/chunks have been provided via commandline,
	//run the tests with these values
	if *nodes != 0 && *chunks != 0 {
		log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
B
Balint Gabor 已提交
115
		err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		if err != nil {
			t.Fatal(err)
		}
	} else {
		var nodeCnt []int
		var chnkCnt []int
		//if the `longrunning` flag has been provided
		//run more test combinations
		if *longrunning {
			chnkCnt = []int{1, 8, 32, 256, 1024}
			nodeCnt = []int{32, 16}
		} else {
			//default test
			chnkCnt = []int{4, 32}
			nodeCnt = []int{32, 16}
		}
		for _, chnk := range chnkCnt {
			for _, n := range nodeCnt {
				log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
B
Balint Gabor 已提交
135
				err := testSyncingViaDirectSubscribe(t, chnk, n)
136 137 138 139 140
				if err != nil {
					t.Fatal(err)
				}
			}
		}
E
ethersphere 已提交
141 142 143
	}
}

144 145 146
var simServiceMap = map[string]simulation.ServiceFunc{
	"streamer": streamerFunc,
}
E
ethersphere 已提交
147

148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
	n := ctx.Config.Node()
	addr := network.NewAddr(n)
	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
	if err != nil {
		return nil, nil, err
	}
	bucket.Store(bucketKeyStore, store)
	localStore := store.(*storage.LocalStore)
	netStore, err := storage.NewNetStore(localStore, nil)
	if err != nil {
		return nil, nil, err
	}
	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
	delivery := NewDelivery(kad, netStore)
	netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
E
ethersphere 已提交
164

165
	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
166 167
		Retrieval:       RetrievalDisabled,
		Syncing:         SyncingAutoSubscribe,
168 169
		SyncUpdateDelay: 3 * time.Second,
	})
B
Balint Gabor 已提交
170

171
	bucket.Store(bucketKeyRegistry, r)
E
ethersphere 已提交
172

173 174 175 176 177 178 179 180 181 182 183 184
	cleanup = func() {
		os.RemoveAll(datadir)
		netStore.Close()
		r.Close()
	}

	return r, cleanup, nil

}

func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
	sim := simulation.New(simServiceMap)
185
	defer sim.Close()
E
ethersphere 已提交
186

187
	log.Info("Initializing test config")
E
ethersphere 已提交
188

189
	conf := &synctestConfig{}
E
ethersphere 已提交
190
	//map of discover ID to indexes of chunks expected at that ID
191
	conf.idToChunksMap = make(map[enode.ID][]int)
E
ethersphere 已提交
192
	//map of overlay address to discover ID
193
	conf.addrToIDMap = make(map[string]enode.ID)
E
ethersphere 已提交
194 195
	//array where the generated chunk hashes will be stored
	conf.hashes = make([]storage.Address, 0)
196 197

	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
E
ethersphere 已提交
198
	if err != nil {
199
		t.Fatal(err)
E
ethersphere 已提交
200 201
	}

B
Balint Gabor 已提交
202
	ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
203 204
	defer cancelSimRun()

B
Balint Gabor 已提交
205 206 207 208 209 210 211
	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
		t.Fatal(err)
	}

	disconnections := sim.PeerEvents(
		context.Background(),
		sim.NodeIDs(),
212
		simulation.NewPeerEventsFilter().Drop(),
B
Balint Gabor 已提交
213 214 215 216
	)

	go func() {
		for d := range disconnections {
217
			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
B
Balint Gabor 已提交
218 219 220 221 222
			t.Fatal("unexpected disconnect")
			cancelSimRun()
		}
	}()

223 224 225 226 227 228 229 230 231 232 233
	result := runSim(conf, ctx, sim, chunkCount)

	if result.Error != nil {
		t.Fatal(result.Error)
	}
	log.Info("Simulation ended")
}

func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {

	return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
234 235 236
		nodeIDs := sim.UpNodeIDs()
		for _, n := range nodeIDs {
			//get the kademlia overlay address from this ID
237
			a := n.Bytes()
238 239 240
			//append it to the array of all overlay addresses
			conf.addrs = append(conf.addrs, a)
			//the proximity calculation is on overlay addr,
241
			//the p2p/simulations check func triggers on enode.ID,
242 243 244 245
			//so we need to know which overlay addr maps to which nodeID
			conf.addrToIDMap[string(a)] = n
		}

246
		//get the node at that index
247 248 249 250 251 252 253 254 255 256 257
		//this is the node selected for upload
		node := sim.RandomUpNode()
		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
		if !ok {
			return fmt.Errorf("No localstore")
		}
		lstore := item.(*storage.LocalStore)
		hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
		if err != nil {
			return err
		}
258 259 260 261 262 263 264 265
		for _, h := range hashes {
			evt := &simulations.Event{
				Type: EventTypeChunkCreated,
				Node: sim.Net.GetNode(node.ID),
				Data: h.String(),
			}
			sim.Net.Events().Send(evt)
		}
266 267 268 269 270 271 272 273 274
		conf.hashes = append(conf.hashes, hashes...)
		mapKeysToNodes(conf)

		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
		// or until the timeout is reached.
		var gDir string
		var globalStore *mockdb.GlobalStore
		if *useMockStore {
			gDir, globalStore, err = createGlobalStore()
E
ethersphere 已提交
275
			if err != nil {
276
				return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
E
ethersphere 已提交
277
			}
278 279 280 281 282 283 284
			defer func() {
				os.RemoveAll(gDir)
				err := globalStore.Close()
				if err != nil {
					log.Error("Error closing global store! %v", "err", err)
				}
			}()
E
ethersphere 已提交
285
		}
286 287
	REPEAT:
		for {
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
			for _, id := range nodeIDs {
				//for each expected chunk, check if it is in the local store
				localChunks := conf.idToChunksMap[id]
				for _, ch := range localChunks {
					//get the real chunk by the index in the index array
					chunk := conf.hashes[ch]
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
					//check if the expected chunk is indeed in the localstore
					var err error
					if *useMockStore {
						//use the globalStore if the mockStore should be used; in that case,
						//the complete localStore stack is bypassed for getting the chunk
						_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
					} else {
						//use the actual localstore
						item, ok := sim.NodeItem(id, bucketKeyStore)
						if !ok {
							return fmt.Errorf("Error accessing localstore")
						}
						lstore := item.(*storage.LocalStore)
						_, err = lstore.Get(ctx, chunk)
					}
					if err != nil {
311
						log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
312 313
						// Do not get crazy with logging the warn message
						time.Sleep(500 * time.Millisecond)
314
						continue REPEAT
315
					}
316 317 318 319 320 321 322
					evt := &simulations.Event{
						Type: EventTypeChunkArrived,
						Node: sim.Net.GetNode(id),
						Data: chunk.String(),
					}
					sim.Net.Events().Send(evt)
					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
B
Balint Gabor 已提交
323
				}
324
			}
325
			return nil
326 327 328
		}
	})
}
E
ethersphere 已提交
329

330 331
/*
The test generates the given number of chunks
E
ethersphere 已提交
332

333 334 335
For every chunk generated, the nearest node addresses
are identified, we verify that the nodes closer to the
chunk addresses actually do have the chunks in their local stores.
E
ethersphere 已提交
336

337 338 339 340
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
B
Balint Gabor 已提交
341
func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
342 343
	sim := simulation.New(map[string]simulation.ServiceFunc{
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
344 345 346
			n := ctx.Config.Node()
			addr := network.NewAddr(n)
			store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
E
ethersphere 已提交
347
			if err != nil {
348
				return nil, nil, err
E
ethersphere 已提交
349
			}
350 351
			bucket.Store(bucketKeyStore, store)
			localStore := store.(*storage.LocalStore)
B
Balint Gabor 已提交
352 353 354 355
			netStore, err := storage.NewNetStore(localStore, nil)
			if err != nil {
				return nil, nil, err
			}
356
			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
B
Balint Gabor 已提交
357 358
			delivery := NewDelivery(kad, netStore)
			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
E
ethersphere 已提交
359

360
			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
361 362
				Retrieval: RetrievalDisabled,
				Syncing:   SyncingRegisterOnly,
363
			})
364 365
			bucket.Store(bucketKeyRegistry, r)

B
Balint Gabor 已提交
366
			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
367 368
			bucket.Store(bucketKeyFileStore, fileStore)

B
Balint Gabor 已提交
369 370 371 372 373 374
			cleanup = func() {
				os.RemoveAll(datadir)
				netStore.Close()
				r.Close()
			}

375 376 377 378 379 380
			return r, cleanup, nil

		},
	})
	defer sim.Close()

B
Balint Gabor 已提交
381
	ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
382 383 384 385
	defer cancelSimRun()

	conf := &synctestConfig{}
	//map of discover ID to indexes of chunks expected at that ID
386
	conf.idToChunksMap = make(map[enode.ID][]int)
387
	//map of overlay address to discover ID
388
	conf.addrToIDMap = make(map[string]enode.ID)
389 390 391 392 393 394 395 396
	//array where the generated chunk hashes will be stored
	conf.hashes = make([]storage.Address, 0)

	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
	if err != nil {
		return err
	}

B
Balint Gabor 已提交
397 398 399 400 401 402 403
	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
		return err
	}

	disconnections := sim.PeerEvents(
		context.Background(),
		sim.NodeIDs(),
404
		simulation.NewPeerEventsFilter().Drop(),
B
Balint Gabor 已提交
405 406 407 408
	)

	go func() {
		for d := range disconnections {
409
			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
B
Balint Gabor 已提交
410 411 412 413 414
			t.Fatal("unexpected disconnect")
			cancelSimRun()
		}
	}()

415 416 417 418
	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
		nodeIDs := sim.UpNodeIDs()
		for _, n := range nodeIDs {
			//get the kademlia overlay address from this ID
419
			a := n.Bytes()
420 421 422
			//append it to the array of all overlay addresses
			conf.addrs = append(conf.addrs, a)
			//the proximity calculation is on overlay addr,
423
			//the p2p/simulations check func triggers on enode.ID,
424 425
			//so we need to know which overlay addr maps to which nodeID
			conf.addrToIDMap[string(a)] = n
E
ethersphere 已提交
426 427
		}

428 429
		var subscriptionCount int

430
		filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
431 432 433
		eventC := sim.PeerEvents(ctx, nodeIDs, filter)

		for j, node := range nodeIDs {
E
ethersphere 已提交
434 435
			log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
			//start syncing!
436 437 438 439 440 441
			item, ok := sim.NodeItem(node, bucketKeyRegistry)
			if !ok {
				return fmt.Errorf("No registry")
			}
			registry := item.(*Registry)

E
ethersphere 已提交
442
			var cnt int
443
			cnt, err = startSyncing(registry, conf)
E
ethersphere 已提交
444 445 446 447 448 449 450 451
			if err != nil {
				return err
			}
			//increment the number of subscriptions we need to wait for
			//by the count returned from startSyncing (SYNC subscriptions)
			subscriptionCount += cnt
		}

452 453 454
		for e := range eventC {
			if e.Error != nil {
				return e.Error
E
ethersphere 已提交
455 456 457 458 459 460
			}
			subscriptionCount--
			if subscriptionCount == 0 {
				break
			}
		}
461 462 463 464 465
		//select a random node for upload
		node := sim.RandomUpNode()
		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
		if !ok {
			return fmt.Errorf("No localstore")
E
ethersphere 已提交
466
		}
467 468 469 470 471 472 473
		lstore := item.(*storage.LocalStore)
		hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
		if err != nil {
			return err
		}
		conf.hashes = append(conf.hashes, hashes...)
		mapKeysToNodes(conf)
E
ethersphere 已提交
474

475 476
		if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
			return err
E
ethersphere 已提交
477
		}
478 479 480 481 482

		var gDir string
		var globalStore *mockdb.GlobalStore
		if *useMockStore {
			gDir, globalStore, err = createGlobalStore()
E
ethersphere 已提交
483
			if err != nil {
484
				return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
E
ethersphere 已提交
485
			}
486
			defer os.RemoveAll(gDir)
E
ethersphere 已提交
487
		}
488 489
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
		// or until the timeout is reached.
490 491
	REPEAT:
		for {
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
			for _, id := range nodeIDs {
				//for each expected chunk, check if it is in the local store
				localChunks := conf.idToChunksMap[id]
				for _, ch := range localChunks {
					//get the real chunk by the index in the index array
					chunk := conf.hashes[ch]
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
					//check if the expected chunk is indeed in the localstore
					var err error
					if *useMockStore {
						//use the globalStore if the mockStore should be used; in that case,
						//the complete localStore stack is bypassed for getting the chunk
						_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
					} else {
						//use the actual localstore
						item, ok := sim.NodeItem(id, bucketKeyStore)
						if !ok {
							return fmt.Errorf("Error accessing localstore")
						}
						lstore := item.(*storage.LocalStore)
						_, err = lstore.Get(ctx, chunk)
					}
					if err != nil {
515
						log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
516 517
						// Do not get crazy with logging the warn message
						time.Sleep(500 * time.Millisecond)
518
						continue REPEAT
519
					}
520
					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
B
Balint Gabor 已提交
521
				}
E
ethersphere 已提交
522
			}
523
			return nil
E
ethersphere 已提交
524 525 526 527 528 529
		}
	})

	if result.Error != nil {
		return result.Error
	}
530

B
Balint Gabor 已提交
531
	log.Info("Simulation ended")
E
ethersphere 已提交
532 533 534 535 536 537 538
	return nil
}

//the server func to start syncing
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
//the kademlia's `EachBin` function.
//returns the number of subscriptions requested
539
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
E
ethersphere 已提交
540
	var err error
V
Viktor Trón 已提交
541
	kad := r.delivery.kad
E
ethersphere 已提交
542 543
	subCnt := 0
	//iterate over each bin and solicit needed subscription to bins
544
	kad.EachBin(r.addr[:], pof, 0, func(conn *network.Peer, po int) bool {
E
ethersphere 已提交
545 546
		//identify begin and start index of the bin(s) we want to subscribe to
		subCnt++
B
Balint Gabor 已提交
547
		err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
E
ethersphere 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
		if err != nil {
			log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
			return false
		}
		return true

	})
	return subCnt, nil
}

//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
	nodemap := make(map[string][]int)
	//build a pot for chunk hashes
	np := pot.NewPot(nil, 0)
	indexmap := make(map[string]int)
	for i, a := range conf.addrs {
		indexmap[string(a)] = i
		np, _, _ = pot.Add(np, a, pof)
	}
B
Balint Gabor 已提交
568 569 570 571 572

	var kadMinProxSize = 2

	ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs)

E
ethersphere 已提交
573 574 575
	//for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
	log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
	for i := 0; i < len(conf.hashes); i++ {
B
Balint Gabor 已提交
576
		var a []byte
E
ethersphere 已提交
577
		np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
B
Balint Gabor 已提交
578 579 580
			// take the first address
			a = val.([]byte)
			return false
E
ethersphere 已提交
581
		})
B
Balint Gabor 已提交
582 583 584 585 586 587 588

		nns := ppmap[common.Bytes2Hex(a)].NNSet
		nns = append(nns, a)

		for _, p := range nns {
			nodemap[string(p)] = append(nodemap[string(p)], i)
		}
E
ethersphere 已提交
589 590 591
	}
	for addr, chunks := range nodemap {
		//this selects which chunks are expected to be found with the given node
592
		conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
E
ethersphere 已提交
593 594 595 596 597
	}
	log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
}

//upload a file(chunks) to a single local node store
598
func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
E
ethersphere 已提交
599 600
	log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
	fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
601
	size := chunkSize
E
ethersphere 已提交
602 603
	var rootAddrs []storage.Address
	for i := 0; i < chunkCount; i++ {
604
		rk, wait, err := fileStore.Store(context.TODO(), testutil.RandomReader(i, size), int64(size), false)
605 606 607
		if err != nil {
			return nil, err
		}
608
		err = wait(context.TODO())
E
ethersphere 已提交
609 610 611 612 613 614 615 616
		if err != nil {
			return nil, err
		}
		rootAddrs = append(rootAddrs, (rk))
	}

	return rootAddrs, nil
}