提交 b8dc56e4 编写于 作者: D DoMyJob

Code Refine, use stop channel to stop those incomplete P2P reading.

Signed-off-by: NDoMyJob <46307927+DoMyJob@users.noreply.github.com>
上级 b9f5c5c0
......@@ -2,7 +2,7 @@ package opt
// properties for internal use.
var (
DebugPrint = true // Debug print
DebugPrint = false // Debug print
IgnoreStorageHeaderErr = true // Ignore storage header mismatch with config.
expendRatioM = 1.2 // Expending ratio of M (col of index table)
......@@ -21,7 +21,7 @@ type DataRecoverEngine struct {
p2p P2PNetwork
taskList []TaskDescription
taskList []*TaskDescription
taskCh chan *TaskDescription
taskStatus map[uint64]TaskResponse
......@@ -73,7 +73,7 @@ func NewDataCodec(ytfs *ytfs.YTFS, p2p P2PNetwork, opt *DataCodecOptions) (*Data
make(chan *TaskDescription, maxTasks),
......@@ -90,7 +90,7 @@ func (codec *DataRecoverEngine) startRecieveTask() {
// TODO: use numberred semiphone
select {
case td := <- codec.taskCh:
codec.taskList = append(codec.taskList, *td)
codec.taskList = append(codec.taskList, td)
case <- done:
......@@ -106,19 +106,19 @@ func (codec *DataRecoverEngine) startRecieveTask() {
// RecoverData recieves a recovery task and start working later on
func (codec *DataRecoverEngine) RecoverData(td TaskDescription) TaskResponse {
func (codec *DataRecoverEngine) RecoverData(td *TaskDescription) TaskResponse {
err := codec.validateTask(td)
if err != nil {
return TaskResponse{ErrorTask, err.Error()}
// sequenced op on chan
codec.taskCh <- &td
codec.taskCh <- td
return codec.RecoverStatus(td)
func (codec *DataRecoverEngine) validateTask(td TaskDescription) error {
func (codec *DataRecoverEngine) validateTask(td *TaskDescription) error {
// verify hash
if len(td.RecoverIDs) > codec.config.ParityShards {
return fmt.Errorf("Recovered data should be < ParityShards")
......@@ -132,51 +132,22 @@ func (codec *DataRecoverEngine) validateTask(td TaskDescription) error {
return nil
func (codec *DataRecoverEngine) doRecoverData(td TaskDescription, done chan interface{}) {
shardReady := make(chan interface{})
timeoutCh := make(chan common.Hash)
func (codec *DataRecoverEngine) doRecoverData(td *TaskDescription, done chan interface{}) {
if ytfsOpt.DebugPrint {
for i:=0;i<len(td.RecoverIDs);i++{
fmt.Printf("Recovery: start working on td(%d), recover hash = %v\n", td.ID, td.Hashes[td.RecoverIDs[i]])
//TODO: simplify the shards initialization
shards := make([][]byte, codec.config.DataShards+codec.config.ParityShards)
for i:=uint32(0);i<uint32(len(shards));i++{
shards[i] = make([]byte, codec.ytfs.Meta().DataBlockSize)
for i:=uint32(0);i<uint32(len(td.RecoverIDs));i++{
shards[i] = nil
for i:=0;i<len(td.Hashes);i++{
if shards[i] != nil {
go codec.getShardFromNetwork(td.Hashes[i], td.Locations[i], shards[i], codec.config.TimeoutInMS, shardReady, timeoutCh)
codec.recordTaskResponse(td, TaskResponse{ProcessingTask, "Retrieve data from P2P network"})
dataReceived := 0
for ;; {
case <-shardReady:
case hash := <-timeoutCh:
codec.recordError(td, fmt.Errorf("Retrieve %x data timeout", hash))
if dataReceived == codec.config.DataShards {
shards, err := codec.prepareDataShards(td)
if err != nil {
codec.recordError(td, err)
codec.recordTaskResponse(td, TaskResponse{ProcessingTask, "EC recovering"})
// Reconstruct the shards
err := codec.recoveryEnc.Reconstruct(shards)
err = codec.recoveryEnc.Reconstruct(shards)
if err != nil {
codec.recordError(td, err)
......@@ -192,43 +163,103 @@ func (codec *DataRecoverEngine) doRecoverData(td TaskDescription, done chan inte
codec.recordTaskResponse(td, TaskResponse{SuccessTask, ""})
codec.recordTaskResponse(td, TaskResponse{SuccessTask, "Task Success"})
done <- struct{}{}
func (codec *DataRecoverEngine) prepareDataShards(td *TaskDescription) ([][]byte, error) {
recoverIndexSet := map[uint32]interface{}{}
shards := make([][]byte, codec.config.DataShards+codec.config.ParityShards)
for i:=uint32(0);i<uint32(len(td.RecoverIDs));i++{
shards[td.RecoverIDs[i]] = nil
recoverIndexSet[td.RecoverIDs[i]] = struct{}{}
type P2PDataReceiveResult struct {
shardSliceID uint32
data []byte
resCh := make(chan *P2PDataReceiveResult, codec.config.DataShards)
errCh := make(chan error, 1)
//Stop those incompleted goroutine by using stopCh
stopSigCh := make(chan interface{})
for i:=0;i<len(td.Hashes);i++{
if _, ok := recoverIndexSet[uint32(i)]; !ok {
go func(shardID uint32) {
hash, loc, timeout := td.Hashes[shardID], td.Locations[shardID], codec.config.TimeoutInMS
data, err := codec.getShardFromNetwork(hash, loc, timeout, stopSigCh)
if err == nil {
resCh <- &P2PDataReceiveResult{shardID, data}
} else {
errCh <- err
codec.recordTaskResponse(td, TaskResponse{ProcessingTask, "Retrieving data from P2P network"})
dataReceived := 0
for ;; {
case res := <-resCh:
shards[res.shardSliceID] = res.data
case err := <-errCh:
codec.recordError(td, fmt.Errorf("ERROR: Retrieve data failed, error %v", err))
return nil, err
if dataReceived == codec.config.DataShards {
return shards, nil
// RecoverStatus queries the status of a task
func (codec *DataRecoverEngine) RecoverStatus(td TaskDescription) TaskResponse {
func (codec *DataRecoverEngine) RecoverStatus(td *TaskDescription) TaskResponse {
defer codec.lock.Unlock()
return codec.taskStatus[td.ID]
func (codec *DataRecoverEngine) recordError(td TaskDescription, err error) {
func (codec *DataRecoverEngine) recordError(td *TaskDescription, err error) {
codec.recordTaskResponse(td, TaskResponse{ErrorTask, err.Error()})
func (codec *DataRecoverEngine) recordTaskResponse(td TaskDescription, res TaskResponse) {
func (codec *DataRecoverEngine) recordTaskResponse(td *TaskDescription, res TaskResponse) {
//TODO: link to levelDB
defer codec.lock.Unlock()
codec.taskStatus[td.ID] = res
func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLocation,
shard []byte, timeoutMS time.Duration,
shardReady chan interface{}, timeoutCh chan common.Hash) {
func (codec *DataRecoverEngine) getShardFromNetwork(hash common.Hash, loc P2PLocation, timeoutMS time.Duration, stopSigCh chan interface{}) ([]byte, error) {
success := make(chan interface{})
errCh := make(chan error)
shard := make([]byte, codec.ytfs.Meta().DataBlockSize)
go func() {
//recieve data
codec.retrieveData(loc, hash, shard)
success <- struct{}{}
err := codec.retrieveData(loc, hash, shard)
if err != nil {
errCh <- err
} else {
success <- struct{}{}
select {
case <- success:
shardReady <- struct{}{}
return shard, nil
case err := <- errCh:
return nil, err
case <- time.After(timeoutMS*time.Millisecond):
timeoutCh <- hash
return nil, fmt.Errorf("Error: p2p get %v from %v timeout", hash, loc)
case <- stopSigCh:
return nil, nil
......@@ -87,10 +87,10 @@ func TestDataRecovery(t *testing.T) {
tdList := []TaskDescription{}
tdList := []*TaskDescription{}
// for i:=0;i<1;i++{
for i:=0;i<len(shards);i++{
td := TaskDescription{
td := &TaskDescription{
......@@ -100,10 +100,9 @@ func TestDataRecovery(t *testing.T) {
tdList = append(tdList, td)
for _,td := range tdList{
tdStatus := codec.RecoverStatus(td)
fmt.Println(td.ID, tdStatus)
if tdStatus.Status != SuccessTask {
t.Fatalf("ERROR: td status(%d): %s", tdStatus.Status, tdStatus.Desc)
} else {
......@@ -116,4 +115,4 @@ func TestDataRecovery(t *testing.T) {
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册