range_scanner.go 3.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
package recordio

import "io"

// Index consists offsets and sizes of the consequetive chunks in a RecordIO file.
type Index struct {
	chunkOffsets []int64
	chunkLens    []uint32
	numRecords   int   // the number of all records in a file.
	chunkRecords []int // the number of records in chunks.
}

// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
func LoadIndex(r io.ReadSeeker) (*Index, error) {
	f := &Index{}
	offset := int64(0)
	var e error
	var hdr *Header

	for {
		hdr, e = parseHeader(r)
		if e != nil {
			break
		}

		f.chunkOffsets = append(f.chunkOffsets, offset)
		f.chunkLens = append(f.chunkLens, hdr.numRecords)
		f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords))
		f.numRecords += int(hdr.numRecords)

		offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent)
		if e != nil {
			break
		}
	}

	if e == io.EOF {
		return f, nil
	}
	return nil, e
}

// NumRecords returns the total number of records in a RecordIO file.
func (r *Index) NumRecords() int {
	return r.numRecords
}

// NumChunks returns the total number of chunks in a RecordIO file.
func (r *Index) NumChunks() int {
	return len(r.chunkLens)
}

// ChunkIndex return the Index of i-th Chunk.
func (r *Index) ChunkIndex(i int) *Index {
	idx := &Index{}
	idx.chunkOffsets = []int64{r.chunkOffsets[i]}
	idx.chunkLens = []uint32{r.chunkLens[i]}
	idx.chunkRecords = []int{r.chunkRecords[i]}
	idx.numRecords = idx.chunkRecords[0]
	return idx
}

// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk.  It returns (-1, -1) if the
// record is out of range.
func (r *Index) Locate(recordIndex int) (int, int) {
	sum := 0
	for i, l := range r.chunkLens {
		sum += int(l)
		if recordIndex < sum {
			return i, recordIndex - sum + int(l)
		}
	}
	return -1, -1
}

H
Helin Wang 已提交
77 78
// RangeScanner scans records in a specified range within [0, numRecords).
type RangeScanner struct {
79 80 81 82 83 84 85 86
	reader          io.ReadSeeker
	index           *Index
	start, end, cur int
	chunkIndex      int
	chunk           *Chunk
	err             error
}

H
Helin Wang 已提交
87
// NewRangeScanner creates a scanner that sequencially reads records in the
88 89
// range [start, start+len).  If start < 0, it scans from the
// beginning.  If len < 0, it scans till the end of file.
H
Helin Wang 已提交
90
func NewRangeScanner(r io.ReadSeeker, index *Index, start, len int) *RangeScanner {
91 92 93 94 95 96 97
	if start < 0 {
		start = 0
	}
	if len < 0 || start+len >= index.NumRecords() {
		len = index.NumRecords() - start
	}

H
Helin Wang 已提交
98
	return &RangeScanner{
99 100 101 102 103 104 105 106 107 108 109 110
		reader:     r,
		index:      index,
		start:      start,
		end:        start + len,
		cur:        start - 1, // The intial status required by Scan.
		chunkIndex: -1,
		chunk:      &Chunk{},
	}
}

// Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet.
H
Helin Wang 已提交
111
func (s *RangeScanner) Scan() bool {
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
	s.cur++

	if s.cur >= s.end {
		s.err = io.EOF
	} else {
		if ci, _ := s.index.Locate(s.cur); s.chunkIndex != ci {
			s.chunkIndex = ci
			s.chunk, s.err = parseChunk(s.reader, s.index.chunkOffsets[ci])
		}
	}

	return s.err == nil
}

// Record returns the record under the current cursor.
H
Helin Wang 已提交
127
func (s *RangeScanner) Record() []byte {
128 129 130 131
	_, ri := s.index.Locate(s.cur)
	return s.chunk.records[ri]
}

H
Helin Wang 已提交
132 133
// Err returns the first non-EOF error that was encountered by the
// Scanner.
H
Helin Wang 已提交
134
func (s *RangeScanner) Err() error {
H
Helin Wang 已提交
135 136 137 138
	if s.err == io.EOF {
		return nil
	}

139 140
	return s.err
}