snapshot_sync_test.go 18.3 KB
Newer Older
E
ethersphere 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package stream

import (
	"context"
	crand "crypto/rand"
	"fmt"
	"io"
	"os"
B
Balint Gabor 已提交
24
	"runtime"
E
ethersphere 已提交
25 26 27 28 29 30
	"sync"
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"
31
	"github.com/ethereum/go-ethereum/node"
E
ethersphere 已提交
32 33 34 35
	"github.com/ethereum/go-ethereum/p2p"
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
	"github.com/ethereum/go-ethereum/swarm/network"
36
	"github.com/ethereum/go-ethereum/swarm/network/simulation"
E
ethersphere 已提交
37
	"github.com/ethereum/go-ethereum/swarm/pot"
38
	"github.com/ethereum/go-ethereum/swarm/state"
E
ethersphere 已提交
39
	"github.com/ethereum/go-ethereum/swarm/storage"
40
	mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
E
ethersphere 已提交
41 42 43 44 45
)

const MaxTimeout = 600

type synctestConfig struct {
B
Balint Gabor 已提交
46 47 48 49 50 51 52 53 54 55 56
	addrs         [][]byte
	hashes        []storage.Address
	idToChunksMap map[discover.NodeID][]int
	//chunksToNodesMap map[string][]int
	addrToIDMap map[string]discover.NodeID
}

// Tests in this file should not request chunks from peers.
// This function will panic indicating that there is a problem if request has been made.
func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
	panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
E
ethersphere 已提交
57 58 59 60 61 62 63 64 65
}

//This test is a syncing test for nodes.
//One node is randomly selected to be the pivot node.
//A configurable number of chunks and nodes can be
//provided to the test, the number of chunks is uploaded
//to the pivot node, and we check that nodes get the chunks
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
66
func TestSyncingViaGlobalSync(t *testing.T) {
B
Balint Gabor 已提交
67 68 69
	if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
		t.Skip("Flaky on mac on travis")
	}
E
ethersphere 已提交
70 71 72 73
	//if nodes/chunks have been provided via commandline,
	//run the tests with these values
	if *nodes != 0 && *chunks != 0 {
		log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
74
		testSyncingViaGlobalSync(t, *chunks, *nodes)
E
ethersphere 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
	} else {
		var nodeCnt []int
		var chnkCnt []int
		//if the `longrunning` flag has been provided
		//run more test combinations
		if *longrunning {
			chnkCnt = []int{1, 8, 32, 256, 1024}
			nodeCnt = []int{16, 32, 64, 128, 256}
		} else {
			//default test
			chnkCnt = []int{4, 32}
			nodeCnt = []int{32, 16}
		}
		for _, chnk := range chnkCnt {
			for _, n := range nodeCnt {
				log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
91
				testSyncingViaGlobalSync(t, chnk, n)
E
ethersphere 已提交
92 93 94 95 96
			}
		}
	}
}

97
func TestSyncingViaDirectSubscribe(t *testing.T) {
B
Balint Gabor 已提交
98 99 100
	if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
		t.Skip("Flaky on mac on travis")
	}
101 102 103 104
	//if nodes/chunks have been provided via commandline,
	//run the tests with these values
	if *nodes != 0 && *chunks != 0 {
		log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
B
Balint Gabor 已提交
105
		err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
		if err != nil {
			t.Fatal(err)
		}
	} else {
		var nodeCnt []int
		var chnkCnt []int
		//if the `longrunning` flag has been provided
		//run more test combinations
		if *longrunning {
			chnkCnt = []int{1, 8, 32, 256, 1024}
			nodeCnt = []int{32, 16}
		} else {
			//default test
			chnkCnt = []int{4, 32}
			nodeCnt = []int{32, 16}
		}
		for _, chnk := range chnkCnt {
			for _, n := range nodeCnt {
				log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
B
Balint Gabor 已提交
125
				err := testSyncingViaDirectSubscribe(t, chnk, n)
126 127 128 129 130
				if err != nil {
					t.Fatal(err)
				}
			}
		}
E
ethersphere 已提交
131 132 133
	}
}

134 135 136
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
	sim := simulation.New(map[string]simulation.ServiceFunc{
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
E
ethersphere 已提交
137

138 139 140 141 142 143 144 145
			id := ctx.Config.ID
			addr := network.NewAddrFromNodeID(id)
			store, datadir, err := createTestLocalStorageForID(id, addr)
			if err != nil {
				return nil, nil, err
			}
			bucket.Store(bucketKeyStore, store)
			localStore := store.(*storage.LocalStore)
B
Balint Gabor 已提交
146 147 148 149
			netStore, err := storage.NewNetStore(localStore, nil)
			if err != nil {
				return nil, nil, err
			}
150
			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
B
Balint Gabor 已提交
151 152
			delivery := NewDelivery(kad, netStore)
			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
E
ethersphere 已提交
153

B
Balint Gabor 已提交
154
			r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
155 156 157 158
				DoSync:          true,
				SyncUpdateDelay: 3 * time.Second,
			})
			bucket.Store(bucketKeyRegistry, r)
E
ethersphere 已提交
159

B
Balint Gabor 已提交
160 161 162 163 164 165
			cleanup = func() {
				os.RemoveAll(datadir)
				netStore.Close()
				r.Close()
			}

166
			return r, cleanup, nil
E
ethersphere 已提交
167

168 169 170
		},
	})
	defer sim.Close()
E
ethersphere 已提交
171

172
	log.Info("Initializing test config")
E
ethersphere 已提交
173

174
	conf := &synctestConfig{}
E
ethersphere 已提交
175 176 177
	//map of discover ID to indexes of chunks expected at that ID
	conf.idToChunksMap = make(map[discover.NodeID][]int)
	//map of overlay address to discover ID
178
	conf.addrToIDMap = make(map[string]discover.NodeID)
E
ethersphere 已提交
179 180
	//array where the generated chunk hashes will be stored
	conf.hashes = make([]storage.Address, 0)
181 182

	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
E
ethersphere 已提交
183
	if err != nil {
184
		t.Fatal(err)
E
ethersphere 已提交
185 186
	}

B
Balint Gabor 已提交
187
	ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
188 189
	defer cancelSimRun()

B
Balint Gabor 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
		t.Fatal(err)
	}

	disconnections := sim.PeerEvents(
		context.Background(),
		sim.NodeIDs(),
		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
	)

	go func() {
		for d := range disconnections {
			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
			t.Fatal("unexpected disconnect")
			cancelSimRun()
		}
	}()

208 209 210 211 212 213 214 215 216 217 218 219 220
	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
		nodeIDs := sim.UpNodeIDs()
		for _, n := range nodeIDs {
			//get the kademlia overlay address from this ID
			a := network.ToOverlayAddr(n.Bytes())
			//append it to the array of all overlay addresses
			conf.addrs = append(conf.addrs, a)
			//the proximity calculation is on overlay addr,
			//the p2p/simulations check func triggers on discover.NodeID,
			//so we need to know which overlay addr maps to which nodeID
			conf.addrToIDMap[string(a)] = n
		}

221
		//get the node at that index
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
		//this is the node selected for upload
		node := sim.RandomUpNode()
		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
		if !ok {
			return fmt.Errorf("No localstore")
		}
		lstore := item.(*storage.LocalStore)
		hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
		if err != nil {
			return err
		}
		conf.hashes = append(conf.hashes, hashes...)
		mapKeysToNodes(conf)

		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
		// or until the timeout is reached.
		allSuccess := false
		var gDir string
		var globalStore *mockdb.GlobalStore
		if *useMockStore {
			gDir, globalStore, err = createGlobalStore()
E
ethersphere 已提交
243
			if err != nil {
244
				return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
E
ethersphere 已提交
245
			}
246 247 248 249 250 251 252
			defer func() {
				os.RemoveAll(gDir)
				err := globalStore.Close()
				if err != nil {
					log.Error("Error closing global store! %v", "err", err)
				}
			}()
E
ethersphere 已提交
253
		}
254
		for !allSuccess {
B
Balint Gabor 已提交
255
			allSuccess = true
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
			for _, id := range nodeIDs {
				//for each expected chunk, check if it is in the local store
				localChunks := conf.idToChunksMap[id]
				localSuccess := true
				for _, ch := range localChunks {
					//get the real chunk by the index in the index array
					chunk := conf.hashes[ch]
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
					//check if the expected chunk is indeed in the localstore
					var err error
					if *useMockStore {
						//use the globalStore if the mockStore should be used; in that case,
						//the complete localStore stack is bypassed for getting the chunk
						_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
					} else {
						//use the actual localstore
						item, ok := sim.NodeItem(id, bucketKeyStore)
						if !ok {
							return fmt.Errorf("Error accessing localstore")
						}
						lstore := item.(*storage.LocalStore)
						_, err = lstore.Get(ctx, chunk)
					}
					if err != nil {
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
						localSuccess = false
282 283
						// Do not get crazy with logging the warn message
						time.Sleep(500 * time.Millisecond)
284 285 286 287
					} else {
						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
					}
				}
B
Balint Gabor 已提交
288 289 290 291
				if !localSuccess {
					allSuccess = false
					break
				}
292 293 294 295 296 297 298
			}
		}
		if !allSuccess {
			return fmt.Errorf("Not all chunks succeeded!")
		}
		return nil
	})
E
ethersphere 已提交
299

300 301 302
	if result.Error != nil {
		t.Fatal(result.Error)
	}
B
Balint Gabor 已提交
303
	log.Info("Simulation ended")
304
}
E
ethersphere 已提交
305

306 307
/*
The test generates the given number of chunks
E
ethersphere 已提交
308

309 310 311
For every chunk generated, the nearest node addresses
are identified, we verify that the nodes closer to the
chunk addresses actually do have the chunks in their local stores.
E
ethersphere 已提交
312

313 314 315 316
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
B
Balint Gabor 已提交
317
func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
318 319
	sim := simulation.New(map[string]simulation.ServiceFunc{
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
E
ethersphere 已提交
320

321 322 323
			id := ctx.Config.ID
			addr := network.NewAddrFromNodeID(id)
			store, datadir, err := createTestLocalStorageForID(id, addr)
E
ethersphere 已提交
324
			if err != nil {
325
				return nil, nil, err
E
ethersphere 已提交
326
			}
327 328
			bucket.Store(bucketKeyStore, store)
			localStore := store.(*storage.LocalStore)
B
Balint Gabor 已提交
329 330 331 332
			netStore, err := storage.NewNetStore(localStore, nil)
			if err != nil {
				return nil, nil, err
			}
333
			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
B
Balint Gabor 已提交
334 335
			delivery := NewDelivery(kad, netStore)
			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
E
ethersphere 已提交
336

B
Balint Gabor 已提交
337
			r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
338 339
			bucket.Store(bucketKeyRegistry, r)

B
Balint Gabor 已提交
340
			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
341 342
			bucket.Store(bucketKeyFileStore, fileStore)

B
Balint Gabor 已提交
343 344 345 346 347 348
			cleanup = func() {
				os.RemoveAll(datadir)
				netStore.Close()
				r.Close()
			}

349 350 351 352 353 354
			return r, cleanup, nil

		},
	})
	defer sim.Close()

B
Balint Gabor 已提交
355
	ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
	defer cancelSimRun()

	conf := &synctestConfig{}
	//map of discover ID to indexes of chunks expected at that ID
	conf.idToChunksMap = make(map[discover.NodeID][]int)
	//map of overlay address to discover ID
	conf.addrToIDMap = make(map[string]discover.NodeID)
	//array where the generated chunk hashes will be stored
	conf.hashes = make([]storage.Address, 0)

	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
	if err != nil {
		return err
	}

B
Balint Gabor 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
		return err
	}

	disconnections := sim.PeerEvents(
		context.Background(),
		sim.NodeIDs(),
		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
	)

	go func() {
		for d := range disconnections {
			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
			t.Fatal("unexpected disconnect")
			cancelSimRun()
		}
	}()

389 390 391 392 393 394 395 396 397 398 399
	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
		nodeIDs := sim.UpNodeIDs()
		for _, n := range nodeIDs {
			//get the kademlia overlay address from this ID
			a := network.ToOverlayAddr(n.Bytes())
			//append it to the array of all overlay addresses
			conf.addrs = append(conf.addrs, a)
			//the proximity calculation is on overlay addr,
			//the p2p/simulations check func triggers on discover.NodeID,
			//so we need to know which overlay addr maps to which nodeID
			conf.addrToIDMap[string(a)] = n
E
ethersphere 已提交
400 401
		}

402 403 404 405 406 407
		var subscriptionCount int

		filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4)
		eventC := sim.PeerEvents(ctx, nodeIDs, filter)

		for j, node := range nodeIDs {
E
ethersphere 已提交
408 409
			log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
			//start syncing!
410 411 412 413 414 415
			item, ok := sim.NodeItem(node, bucketKeyRegistry)
			if !ok {
				return fmt.Errorf("No registry")
			}
			registry := item.(*Registry)

E
ethersphere 已提交
416
			var cnt int
417
			cnt, err = startSyncing(registry, conf)
E
ethersphere 已提交
418 419 420 421 422 423 424 425
			if err != nil {
				return err
			}
			//increment the number of subscriptions we need to wait for
			//by the count returned from startSyncing (SYNC subscriptions)
			subscriptionCount += cnt
		}

426 427 428
		for e := range eventC {
			if e.Error != nil {
				return e.Error
E
ethersphere 已提交
429 430 431 432 433 434
			}
			subscriptionCount--
			if subscriptionCount == 0 {
				break
			}
		}
435 436 437 438 439
		//select a random node for upload
		node := sim.RandomUpNode()
		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
		if !ok {
			return fmt.Errorf("No localstore")
E
ethersphere 已提交
440
		}
441 442 443 444 445 446 447
		lstore := item.(*storage.LocalStore)
		hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
		if err != nil {
			return err
		}
		conf.hashes = append(conf.hashes, hashes...)
		mapKeysToNodes(conf)
E
ethersphere 已提交
448

449 450
		if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
			return err
E
ethersphere 已提交
451
		}
452 453 454 455 456

		var gDir string
		var globalStore *mockdb.GlobalStore
		if *useMockStore {
			gDir, globalStore, err = createGlobalStore()
E
ethersphere 已提交
457
			if err != nil {
458
				return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
E
ethersphere 已提交
459
			}
460
			defer os.RemoveAll(gDir)
E
ethersphere 已提交
461
		}
462 463 464 465
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
		// or until the timeout is reached.
		allSuccess := false
		for !allSuccess {
B
Balint Gabor 已提交
466
			allSuccess = true
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
			for _, id := range nodeIDs {
				//for each expected chunk, check if it is in the local store
				localChunks := conf.idToChunksMap[id]
				localSuccess := true
				for _, ch := range localChunks {
					//get the real chunk by the index in the index array
					chunk := conf.hashes[ch]
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
					//check if the expected chunk is indeed in the localstore
					var err error
					if *useMockStore {
						//use the globalStore if the mockStore should be used; in that case,
						//the complete localStore stack is bypassed for getting the chunk
						_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
					} else {
						//use the actual localstore
						item, ok := sim.NodeItem(id, bucketKeyStore)
						if !ok {
							return fmt.Errorf("Error accessing localstore")
						}
						lstore := item.(*storage.LocalStore)
						_, err = lstore.Get(ctx, chunk)
					}
					if err != nil {
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
						localSuccess = false
493 494
						// Do not get crazy with logging the warn message
						time.Sleep(500 * time.Millisecond)
495 496 497 498
					} else {
						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
					}
				}
B
Balint Gabor 已提交
499 500 501 502
				if !localSuccess {
					allSuccess = false
					break
				}
E
ethersphere 已提交
503 504
			}
		}
505 506 507 508
		if !allSuccess {
			return fmt.Errorf("Not all chunks succeeded!")
		}
		return nil
E
ethersphere 已提交
509 510 511 512 513
	})

	if result.Error != nil {
		return result.Error
	}
514

B
Balint Gabor 已提交
515
	log.Info("Simulation ended")
E
ethersphere 已提交
516 517 518 519 520 521 522
	return nil
}

//the server func to start syncing
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
//the kademlia's `EachBin` function.
//returns the number of subscriptions requested
523
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
E
ethersphere 已提交
524
	var err error
V
Viktor Trón 已提交
525
	kad := r.delivery.kad
E
ethersphere 已提交
526 527
	subCnt := 0
	//iterate over each bin and solicit needed subscription to bins
V
Viktor Trón 已提交
528
	kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool {
E
ethersphere 已提交
529 530 531
		//identify begin and start index of the bin(s) we want to subscribe to

		subCnt++
B
Balint Gabor 已提交
532
		err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
E
ethersphere 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
		if err != nil {
			log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
			return false
		}
		return true

	})
	return subCnt, nil
}

//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
	nodemap := make(map[string][]int)
	//build a pot for chunk hashes
	np := pot.NewPot(nil, 0)
	indexmap := make(map[string]int)
	for i, a := range conf.addrs {
		indexmap[string(a)] = i
		np, _, _ = pot.Add(np, a, pof)
	}
B
Balint Gabor 已提交
553 554 555 556 557

	var kadMinProxSize = 2

	ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs)

E
ethersphere 已提交
558 559 560
	//for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
	log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
	for i := 0; i < len(conf.hashes); i++ {
B
Balint Gabor 已提交
561
		var a []byte
E
ethersphere 已提交
562
		np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
B
Balint Gabor 已提交
563 564 565
			// take the first address
			a = val.([]byte)
			return false
E
ethersphere 已提交
566
		})
B
Balint Gabor 已提交
567 568 569 570 571 572 573

		nns := ppmap[common.Bytes2Hex(a)].NNSet
		nns = append(nns, a)

		for _, p := range nns {
			nodemap[string(p)] = append(nodemap[string(p)], i)
		}
E
ethersphere 已提交
574 575 576
	}
	for addr, chunks := range nodemap {
		//this selects which chunks are expected to be found with the given node
577
		conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
E
ethersphere 已提交
578 579 580 581 582
	}
	log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
}

//upload a file(chunks) to a single local node store
583
func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
E
ethersphere 已提交
584 585
	log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
	fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
586
	size := chunkSize
E
ethersphere 已提交
587 588
	var rootAddrs []storage.Address
	for i := 0; i < chunkCount; i++ {
589
		rk, wait, err := fileStore.Store(context.TODO(), io.LimitReader(crand.Reader, int64(size)), int64(size), false)
590 591 592
		if err != nil {
			return nil, err
		}
593
		err = wait(context.TODO())
E
ethersphere 已提交
594 595 596 597 598 599 600 601
		if err != nil {
			return nil, err
		}
		rootAddrs = append(rootAddrs, (rk))
	}

	return rootAddrs, nil
}