recovery_test.go 3.0 KB
Newer Older
1 2 3 4 5
// Package recovery
package recovery

import (
	"bytes"
6
	"fmt"
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
	"io/ioutil"
	"crypto/sha256"
	"math/rand"
	"testing"
	"time"

	"github.com/klauspost/reedsolomon"
	"github.com/ethereum/go-ethereum/common"
	"github.com/yottachain/YTFS"
	ytfsOpt "github.com/yottachain/YTFS/opt"
	ytfsCommon "github.com/yottachain/YTFS/common"
)

func TestNewDataRecovery(t *testing.T) {
	_, err := NewDataCodec(nil, nil, DefaultRecoveryOption())
	if err != nil {
		t.Fail()
	}
}

func randomFill(size uint32) []byte {
	buf := make([]byte, size, size)
29 30 31
	head := make([]byte, 16, 16)
	rand.Read(head)
	copy(buf, head)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
	return buf
}

func createShards(dataShards, parityShards int) ([]common.Hash, [][]byte) {
	shards := make([][]byte, dataShards + parityShards)
	hashes := make([]common.Hash, dataShards + parityShards)
	dataBlkSize := ytfsOpt.DefaultOptions().DataBlockSize
	for i:=0;i<dataShards;i++{
		shards[i] = randomFill(dataBlkSize)
		sum256 := sha256.Sum256(shards[i])
		hashes[i] = common.BytesToHash(sum256[:])
	}

	for i:=dataShards;i<dataShards+parityShards;i++{
		shards[i] = make([]byte, dataBlkSize)
	}

	return hashes,shards
}

52
func createP2PAndDistributeData(dataShards, parityShards int) (P2PNetwork, []P2PLocation, []common.Hash, [][]byte) {
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	hashes, shards := createShards(dataShards, parityShards)
	locations := make([]P2PLocation, len(hashes))
	enc, _ := reedsolomon.New(dataShards, parityShards)
	enc.Encode(shards)
	//update parity hash
	for i:=dataShards;i<dataShards+parityShards;i++{
		sum256 := sha256.Sum256(shards[i])
		hashes[i] = common.BytesToHash(sum256[:])
	}

	for i:=0;i<dataShards+parityShards;i++{
		locations[i] = P2PLocation(common.BytesToAddress(hashes[i][:]))
	}

	p2p, _ := InititalP2PMock(locations, shards)
	return p2p, locations, hashes, shards
}

func TestDataRecovery(t *testing.T) {
	rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
	config := ytfsOpt.DefaultOptions()
	// defer os.Remove(config.StorageName)

	yd, err := ytfs.Open(rootDir, config)

	recConfig := DefaultRecoveryOption()
79 80 81
	p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)

	for i:=0;i<len(shards);i++{
D
DoMyJob 已提交
82
		fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
83
	}
84 85 86 87 88 89

	codec, err := NewDataCodec(yd, p2p, recConfig)
	if err != nil {
		t.Fail()
	}

90
	tdList := []*TaskDescription{}
91 92
	// for i:=0;i<1;i++{
	for i:=0;i<len(shards);i++{
93
		td := &TaskDescription{
94
			uint64(i),
95
			hashes,
96
			locs,
97
			[]uint32{uint32(i)},
98 99 100 101 102
		}
		codec.RecoverData(td)
		tdList = append(tdList, td)
	}

103
	time.Sleep(2*time.Second)
104 105 106 107 108
	for _,td := range tdList{
		tdStatus := codec.RecoverStatus(td)
		if tdStatus.Status != SuccessTask {
			t.Fatalf("ERROR: td status(%d): %s", tdStatus.Status, tdStatus.Desc)
		} else {
109 110 111 112 113 114
			data, err := yd.Get(ytfsCommon.IndexTableKey(td.Hashes[td.RecoverIDs[0]]))
			if err != nil || bytes.Compare(data, shards[td.RecoverIDs[0]]) != 0 {
				t.Fatalf("Error: err(%v), dataCompare (%d). hash(%v) data(%v) shards(%v)",
				err, bytes.Compare(data, shards[td.RecoverIDs[0]]),
				td.Hashes[td.RecoverIDs[0]],
				data[:20], shards[td.RecoverIDs[0]][:20])
115 116 117
			}
		}
	}
118
}