multi_reader.go 2.2 KB
Newer Older
H
Helin Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
package recordio

import (
	"fmt"
	"os"
	"path/filepath"
)

// MultiScanner is a scanner for multiple recordio files.
type MultiScanner struct {
	paths      []string
	curFile    *os.File
	curScanner *Scanner
	pathIdx    int
	end        bool
	err        error
}

// NewMultiScanner creates a new MultiScanner.
func NewMultiScanner(paths []string) (*MultiScanner, error) {
	var ps []string
	for _, s := range paths {
		match, err := filepath.Glob(s)
		if err != nil {
			return nil, err
		}

		ps = append(ps, match...)
	}

	if len(ps) == 0 {
		return nil, fmt.Errorf("no valid path provided: %v", paths)
	}

	return &MultiScanner{paths: ps}, nil
}

// Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet.
func (s *MultiScanner) Scan() bool {
	if s.err != nil {
		return false
	}

	if s.end {
		return false
	}

	if s.curScanner == nil {
		more, err := s.nextFile()
		if err != nil {
			s.err = err
			return false
		}

		if !more {
			s.end = true
			return false
		}
	}

	curMore := s.curScanner.Scan()
	s.err = s.curScanner.Err()

	if s.err != nil {
		return curMore
	}

	if !curMore {
		err := s.curFile.Close()
		if err != nil {
			s.err = err
			return false
		}
		s.curFile = nil

		more, err := s.nextFile()
		if err != nil {
			s.err = err
			return false
		}

		if !more {
			s.end = true
			return false
		}

		return s.Scan()
	}
	return true
}

// Err returns the first non-EOF error that was encountered by the
// Scanner.
func (s *MultiScanner) Err() error {
	return s.err
}

// Record returns the record under the current cursor.
func (s *MultiScanner) Record() []byte {
	if s.curScanner == nil {
		return nil
	}

	return s.curScanner.Record()
}

// Close release the resources.
func (s *MultiScanner) Close() error {
	s.curScanner = nil
	if s.curFile != nil {
		err := s.curFile.Close()
		s.curFile = nil
		return err
	}
	return nil
}

func (s *MultiScanner) nextFile() (bool, error) {
	if s.pathIdx >= len(s.paths) {
		return false, nil
	}

	path := s.paths[s.pathIdx]
	s.pathIdx++
	f, err := os.Open(path)
	if err != nil {
		return false, err
	}

	idx, err := LoadIndex(f)
	if err != nil {
		f.Close()
		return false, err
	}

	s.curFile = f
	s.curScanner = NewScanner(f, idx, 0, -1)
	return true, nil
}