提交 2929af23 编写于 作者: D DoMyJob

Add more test cases for error handling.

Signed-off-by: NDoMyJob <46307927+DoMyJob@users.noreply.github.com>
上级 32a9528d
......@@ -2,7 +2,7 @@ package opt
// properties for internal use.
var (
DebugPrint = false // Debug print
DebugPrint = true // Debug print
IgnoreStorageHeaderErr = true // Ignore storage header mismatch with config.
expendRatioM = 1.2 // Expending ratio of M (col of index table)
)
......@@ -50,6 +50,16 @@ const (
ErrorTask
)
func (response ResponseCode) String() string{
switch response{
case SuccessTask : return "SuccessTask"
case ProcessingTask : return "ProcessingTask"
case PendingTask : return "PendingTask"
case ErrorTask : return "ErrorTask"
default: return "UnkownStatus"
}
}
// TaskResponse descirbes the status of the task
type TaskResponse struct {
Status ResponseCode
......@@ -99,7 +109,6 @@ func (codec *DataRecoverEngine) startRecieveTask() {
running++
task := codec.taskList[0]
codec.taskList = codec.taskList[1:]
codec.recordTaskResponse(task, TaskResponse{PendingTask, ""})
go codec.doRecoverData(task, done)
}
}
......@@ -109,12 +118,13 @@ func (codec *DataRecoverEngine) startRecieveTask() {
func (codec *DataRecoverEngine) RecoverData(td *TaskDescription) TaskResponse {
err := codec.validateTask(td)
if err != nil {
return TaskResponse{ErrorTask, err.Error()}
codec.recordError(td, err)
return codec.RecoverStatus(td)
}
// sequenced op on chan
codec.taskCh <- td
codec.recordTaskResponse(td, TaskResponse{PendingTask, "Task is pending"})
return codec.RecoverStatus(td)
}
......@@ -257,7 +267,7 @@ func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLoc
case err := <- errCh:
return nil, err
case <- time.After(timeoutMS*time.Millisecond):
return nil, fmt.Errorf("Error: p2p get %v from %v timeout", hash, loc)
return nil, fmt.Errorf("Error: p2p get %x from %x timeout", hash, loc)
case <- stopSigCh:
return nil, nil
}
......@@ -265,7 +275,6 @@ func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLoc
func (codec *DataRecoverEngine) retrieveData(loc P2PLocation, hash common.Hash, data []byte) error {
// Read p2p network
// time.Sleep(30*time.Millisecond)
codec.p2p.RetrieveData(loc, data)
return nil
}
\ No newline at end of file
......@@ -15,9 +15,9 @@ type DataCodecOptions struct{
// DefaultRecoveryOption gives the default data recovery codec config
func DefaultRecoveryOption() *DataCodecOptions {
return &DataCodecOptions{
5,
3,
2,
4,
12,
5000,
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ package recovery
import(
// "fmt"
// "bytes"
"time"
"github.com/ethereum/go-ethereum/common"
)
......@@ -22,6 +23,7 @@ type P2PMock struct{
// RetrieveData get data from p2p network address
func (mock P2PMock) RetrieveData(peer P2PLocation, msgByte []byte) error {
time.Sleep(50*time.Millisecond)
copy(msgByte, mock.network[peer])
return nil
}
......
......@@ -88,7 +88,6 @@ func TestDataRecovery(t *testing.T) {
}
tdList := []*TaskDescription{}
// for i:=0;i<1;i++{
for i:=0;i<len(shards);i++{
td := &TaskDescription{
uint64(i),
......@@ -116,3 +115,106 @@ func TestDataRecovery(t *testing.T) {
}
}
}
func TestMultiplyDataRecovery(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := ytfsOpt.DefaultOptions()
// defer os.Remove(config.StorageName)
yd, err := ytfs.Open(rootDir, config)
recConfig := DefaultRecoveryOption()
p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)
for i:=0;i<len(shards);i++{
fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
}
codec, err := NewDataCodec(yd, p2p, recConfig)
if err != nil {
t.Fail()
}
td:= &TaskDescription{
uint64(2),
hashes,
locs,
[]uint32{0,1,2},
}
codec.RecoverData(td)
time.Sleep(2*time.Second)
tdStatus := codec.RecoverStatus(td)
if tdStatus.Status != SuccessTask {
t.Fatalf("ERROR: td status(%d): %s", tdStatus.Status, tdStatus.Desc)
} else {
for i:=0;i<len(td.RecoverIDs);i++{
data, err := yd.Get(ytfsCommon.IndexTableKey(td.Hashes[td.RecoverIDs[i]]))
if err != nil || bytes.Compare(data, shards[td.RecoverIDs[i]]) != 0 {
t.Fatalf("Error: err(%v), dataCompare (%d). hash(%v) data(%v) shards(%v)",
err, bytes.Compare(data, shards[td.RecoverIDs[i]]),
td.Hashes[td.RecoverIDs[i]],
data[:20], shards[td.RecoverIDs[i]][:20])
}
}
}
}
func TestDataRecoveryError(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := ytfsOpt.DefaultOptions()
// defer os.Remove(config.StorageName)
yd, err := ytfs.Open(rootDir, config)
recConfig := DefaultRecoveryOption()
recConfig.TimeoutInMS = 10
p2p, locs, hashes, shards := createP2PAndDistributeData(recConfig.DataShards, recConfig.ParityShards)
for i:=0;i<len(shards);i++{
fmt.Printf("Data[%d] = %x:%x\n", i, hashes[i], shards[i][:20])
}
codec, err := NewDataCodec(yd, p2p, recConfig)
if err != nil {
t.Fail()
}
recIds := make([]uint32, recConfig.ParityShards+1)
for i:=0;i<len(recIds);i++{
recIds[i]=uint32(i)
}
td:= &TaskDescription{
uint64(0),
hashes,
locs,
recIds,
}
codec.RecoverData(td)
tdStatus := codec.RecoverStatus(td)
if tdStatus.Status != ErrorTask {
t.Fatalf("ERROR: td status(%d): %s", tdStatus.Status, tdStatus.Desc)
} else {
t.Log("Expected error:", tdStatus)
}
td = &TaskDescription{
uint64(1),
hashes,
locs,
[]uint32{0},
}
codec.RecoverData(td)
time.Sleep(2*time.Second)
tdStatus = codec.RecoverStatus(td)
if tdStatus.Status != ErrorTask {
t.Fatalf("ERROR: td status(%d): %s", tdStatus.Status, tdStatus.Desc)
} else {
t.Log("Expected error:", tdStatus)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册