提交 95931d3a 编写于 作者: D DoMyJob

Refine test cases and add few benchmarks.

Signed-off-by: NDoMyJob <46307927+DoMyJob@users.noreply.github.com>
上级 f767502a
......@@ -2,7 +2,7 @@ package opt
// properties for internal use.
var (
DebugPrint = true // Debug print
DebugPrint = false // Debug print
IgnoreStorageHeaderErr = true // Ignore storage header mismatch with config.
expendRatioM = 1.2 // Expending ratio of M (col of index table)
)
......@@ -120,7 +120,7 @@ func (codec *DataRecoverEngine) startRecieveTask() {
}
// RecoverData recieves a recovery task and start working later on
func (codec *DataRecoverEngine) RecoverData(td *TaskDescription) TaskResponse {
func (codec *DataRecoverEngine) RecoverData(td *TaskDescription, successCB... func() TaskResponse) TaskResponse {
err := codec.validateTask(td)
if err != nil {
codec.recordError(td, err)
......@@ -255,20 +255,19 @@ func (codec *DataRecoverEngine) recordTaskResponse(td *TaskDescription, res Task
func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLocation, timeoutMS time.Duration, stopSigCh chan interface{}) ([]byte, error) {
success := make(chan interface{})
errCh := make(chan error)
shard := make([]byte, codec.ytfs.Meta().DataBlockSize)
go func() {
//recieve data
err := codec.retrieveData(loc, hash, shard)
shard, err := codec.retrieveData(loc, hash)
if err != nil {
errCh <- err
} else {
success <- struct{}{}
success <- shard
}
}()
select {
case <-success:
return shard, nil
case shard := <-success:
return shard.([]byte), nil
case err := <-errCh:
return nil, err
case <-time.After(timeoutMS * time.Millisecond):
......@@ -278,8 +277,8 @@ func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLoc
}
}
func (codec *DataRecoverEngine) retrieveData(loc P2PLocation, hash common.Hash, data []byte) error {
func (codec *DataRecoverEngine) retrieveData(loc P2PLocation, hash common.Hash) ([]byte, error) {
// Read p2p network
codec.p2p.RetrieveData(loc, data)
return nil
data, _ := codec.p2p.RetrieveData(loc, hash)
return data, nil
}
......@@ -13,29 +13,43 @@ type P2PLocation common.Address
// P2PNetwork interface defines the P2P operations and implements mock for test
type P2PNetwork interface {
RetrieveData(peer P2PLocation, msgByte []byte) error
RetrieveData(peer P2PLocation, hash common.Hash) ([]byte, error)
}
// P2PMock mocks a p2p network for test
type P2PMock struct {
network map[P2PLocation][]byte
networkData map[P2PLocation][]byte
networkDelay map[P2PLocation]time.Duration
}
const defaultNetworkDelayInMS time.Duration = 50
// RetrieveData get data from p2p network address
func (mock P2PMock) RetrieveData(peer P2PLocation, msgByte []byte) error {
time.Sleep(50 * time.Millisecond)
copy(msgByte, mock.network[peer])
return nil
func (mock P2PMock) RetrieveData(peer P2PLocation, hash common.Hash) ([]byte, error) {
if delay, ok := mock.networkDelay[peer]; ok {
time.Sleep(delay * time.Millisecond)
} else {
time.Sleep(defaultNetworkDelayInMS * time.Millisecond)
}
msgByte := make([]byte, len(mock.networkData[peer]))
copy(msgByte, mock.networkData[peer])
return msgByte, nil
}
// InititalP2PMock initializes P2P mock module
func InititalP2PMock(peers []P2PLocation, dataBlks [][]byte) (P2PNetwork, error) {
func InititalP2PMock(peers []P2PLocation, dataBlks [][]byte, networkParams ...time.Duration) (P2PNetwork, error) {
p2p := P2PMock{
map[P2PLocation][]byte{},
map[P2PLocation]time.Duration{},
}
for i := 0; i < len(peers); i++ {
p2p.network[peers[i]] = dataBlks[i]
p2p.networkData[peers[i]] = dataBlks[i]
}
for i, delay := range networkParams {
p2p.networkDelay[peers[i]] = delay
}
return p2p, nil
}
......@@ -27,6 +27,7 @@ func TestNewDataRecovery(t *testing.T) {
func randomFill(size uint32) []byte {
buf := make([]byte, size, size)
head := make([]byte, 16, 16)
rand.Seed(int64(time.Now().Nanosecond()))
rand.Read(head)
copy(buf, head)
return buf
......@@ -49,9 +50,8 @@ func createShards(dataShards, parityShards int) ([]common.Hash, [][]byte) {
return hashes, shards
}
func createP2PAndDistributeData(dataShards, parityShards int) (P2PNetwork, []P2PLocation, []common.Hash, [][]byte) {
func createData(dataShards, parityShards int) ([]common.Hash, [][]byte) {
hashes, shards := createShards(dataShards, parityShards)
locations := make([]P2PLocation, len(hashes))
enc, _ := reedsolomon.New(dataShards, parityShards)
enc.Encode(shards)
//update parity hash
......@@ -60,12 +60,17 @@ func createP2PAndDistributeData(dataShards, parityShards int) (P2PNetwork, []P2P
hashes[i] = common.BytesToHash(sum256[:])
}
for i := 0; i < dataShards+parityShards; i++ {
return hashes, shards
}
func initailP2PMockWithShards(hashes []common.Hash, shards [][]byte, delays ...time.Duration) (P2PNetwork, []P2PLocation) {
locations := make([]P2PLocation, len(hashes))
for i := 0; i < len(hashes); i++ {
locations[i] = P2PLocation(common.BytesToAddress(hashes[i][:]))
}
p2p, _ := InititalP2PMock(locations, shards)
return p2p, locations, hashes, shards
p2p, _ := InititalP2PMock(locations, shards, delays...)
return p2p, locations
}
func TestDataRecovery(t *testing.T) {
......@@ -76,13 +81,14 @@ func TestDataRecovery(t *testing.T) {
yd, err := ytfs.Open(rootDir, config)
recConfig := DefaultRecoveryOption()
p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)
hashes, shards := createData(recConfig.DataShards, recConfig.ParityShards)
p2pNet, p2pNodes := initailP2PMockWithShards(hashes, shards)
for i := 0; i < len(shards); i++ {
fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
}
codec, err := NewDataCodec(yd, p2p, recConfig)
codec, err := NewDataCodec(yd, p2pNet, recConfig)
if err != nil {
t.Fail()
}
......@@ -92,7 +98,7 @@ func TestDataRecovery(t *testing.T) {
td := &TaskDescription{
uint64(i),
hashes,
locs,
p2pNodes,
[]uint32{uint32(i)},
}
codec.RecoverData(td)
......@@ -124,13 +130,14 @@ func TestMultiplyDataRecovery(t *testing.T) {
yd, err := ytfs.Open(rootDir, config)
recConfig := DefaultRecoveryOption()
p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)
hashes, shards := createData(recConfig.DataShards, recConfig.ParityShards)
p2pNet, p2pNodes := initailP2PMockWithShards(hashes, shards)
for i := 0; i < len(shards); i++ {
fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
}
codec, err := NewDataCodec(yd, p2p, recConfig)
codec, err := NewDataCodec(yd, p2pNet, recConfig)
if err != nil {
t.Fail()
}
......@@ -138,7 +145,7 @@ func TestMultiplyDataRecovery(t *testing.T) {
td := &TaskDescription{
uint64(2),
hashes,
locs,
p2pNodes,
[]uint32{0, 1, 2},
}
codec.RecoverData(td)
......@@ -169,13 +176,14 @@ func TestDataRecoveryError(t *testing.T) {
recConfig := DefaultRecoveryOption()
recConfig.TimeoutInMS = 10
p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)
hashes, shards := createData(recConfig.DataShards, recConfig.ParityShards)
p2pNet, p2pNodes := initailP2PMockWithShards(hashes, shards)
for i := 0; i < len(shards); i++ {
fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
}
codec, err := NewDataCodec(yd, p2p, recConfig)
codec, err := NewDataCodec(yd, p2pNet, recConfig)
if err != nil {
t.Fail()
}
......@@ -188,7 +196,7 @@ func TestDataRecoveryError(t *testing.T) {
td := &TaskDescription{
uint64(0),
hashes,
locs,
p2pNodes,
recIds,
}
codec.RecoverData(td)
......@@ -203,7 +211,7 @@ func TestDataRecoveryError(t *testing.T) {
td = &TaskDescription{
uint64(1),
hashes,
locs,
p2pNodes,
[]uint32{0},
}
codec.RecoverData(td)
......@@ -216,9 +224,17 @@ func TestDataRecoveryError(t *testing.T) {
}
}
func BenchmarkDataRecovery(b *testing.B) {
dataShards, parityShards := 2, 3
_, _, _, shards := createP2PAndDistributeData(dataShards, parityShards)
func setupBenchmarkEnv(recConfig *DataCodecOptions, p2pDelays...time.Duration) (*DataRecoverEngine, []common.Hash, []P2PLocation) {
hashes, shards := createData(recConfig.DataShards, recConfig.ParityShards)
p2pNet, p2pNodes := initailP2PMockWithShards(hashes, shards, p2pDelays...)
codec, _ := NewDataCodec(nil, p2pNet, recConfig)
return codec, hashes, p2pNodes
}
func BenchmarkPureDataRecovery(b *testing.B) {
dataShards, parityShards := 5, 3
_, shards := createData(dataShards, parityShards)
rsEnc, err := reedsolomon.New(dataShards, parityShards)
if err != nil {
b.Fatal(err)
......@@ -230,3 +246,54 @@ func BenchmarkDataRecovery(b *testing.B) {
rsEnc.Reconstruct(shards)
}
}
func BenchmarkFastP2PDataRecovery(b *testing.B) {
recConfig := DefaultRecoveryOption()
codec, hashes, p2pNodes := setupBenchmarkEnv(recConfig, []time.Duration{250,250,250,250,250,250,250}...)
b.ResetTimer()
for n := 0; n < b.N; n++ {
done := make(chan interface{}, 1)
td := &TaskDescription{
uint64(rand.Int63()),
hashes,
p2pNodes,
[]uint32{uint32(rand.Intn(len(hashes)))},
}
codec.doRecoverData(td, done)
}
}
func BenchmarkSlowP2PDataRecovery(b *testing.B) {
recConfig := DefaultRecoveryOption()
codec, hashes, p2pNodes := setupBenchmarkEnv(recConfig, []time.Duration{25,25,25,25,25,25,25}...)
b.ResetTimer()
for n := 0; n < b.N; n++ {
done := make(chan interface{}, 1)
td := &TaskDescription{
uint64(rand.Int63()),
hashes,
p2pNodes,
[]uint32{uint32(rand.Intn(len(hashes)))},
}
codec.doRecoverData(td, done)
}
}
func BenchmarkUnevenP2PDataRecovery(b *testing.B) {
recConfig := DefaultRecoveryOption()
codec, hashes, p2pNodes := setupBenchmarkEnv(recConfig, []time.Duration{250,211,173,136,99,62,25}...)
b.ResetTimer()
for n := 0; n < b.N; n++ {
done := make(chan interface{}, 1)
td := &TaskDescription{
uint64(rand.Int63()),
hashes,
p2pNodes,
[]uint32{uint32(rand.Intn(len(hashes)))},
}
codec.doRecoverData(td, done)
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册