reader.go 3.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
package recordio

import "io"

// Index consists offsets and sizes of the consequetive chunks in a RecordIO file.
type Index struct {
	chunkOffsets []int64
	chunkLens    []uint32
	numRecords   int   // the number of all records in a file.
	chunkRecords []int // the number of records in chunks.
}

// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
func LoadIndex(r io.ReadSeeker) (*Index, error) {
	f := &Index{}
	offset := int64(0)
	var e error
	var hdr *Header

	for {
		hdr, e = parseHeader(r)
		if e != nil {
			break
		}

		f.chunkOffsets = append(f.chunkOffsets, offset)
		f.chunkLens = append(f.chunkLens, hdr.numRecords)
		f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords))
		f.numRecords += int(hdr.numRecords)

		offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent)
		if e != nil {
			break
		}
	}

	if e == io.EOF {
		return f, nil
	}
	return nil, e
}

// NumRecords returns the total number of records in a RecordIO file.
func (r *Index) NumRecords() int {
	return r.numRecords
}

// NumChunks returns the total number of chunks in a RecordIO file.
func (r *Index) NumChunks() int {
	return len(r.chunkLens)
}

// ChunkIndex return the Index of i-th Chunk.
func (r *Index) ChunkIndex(i int) *Index {
	idx := &Index{}
	idx.chunkOffsets = []int64{r.chunkOffsets[i]}
	idx.chunkLens = []uint32{r.chunkLens[i]}
	idx.chunkRecords = []int{r.chunkRecords[i]}
	idx.numRecords = idx.chunkRecords[0]
	return idx
}

// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk.  It returns (-1, -1) if the
// record is out of range.
func (r *Index) Locate(recordIndex int) (int, int) {
	sum := 0
	for i, l := range r.chunkLens {
		sum += int(l)
		if recordIndex < sum {
			return i, recordIndex - sum + int(l)
		}
	}
	return -1, -1
}

// Scanner scans records in a specified range within [0, numRecords).
type Scanner struct {
	reader          io.ReadSeeker
	index           *Index
	start, end, cur int
	chunkIndex      int
	chunk           *Chunk
	err             error
}

// NewScanner creates a scanner that sequencially reads records in the
// range [start, start+len).  If start < 0, it scans from the
// beginning.  If len < 0, it scans till the end of file.
func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner {
	if start < 0 {
		start = 0
	}
	if len < 0 || start+len >= index.NumRecords() {
		len = index.NumRecords() - start
	}

	return &Scanner{
		reader:     r,
		index:      index,
		start:      start,
		end:        start + len,
		cur:        start - 1, // The intial status required by Scan.
		chunkIndex: -1,
		chunk:      &Chunk{},
	}
}

// Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet.
func (s *Scanner) Scan() bool {
	s.cur++

	if s.cur >= s.end {
		s.err = io.EOF
	} else {
		if ci, _ := s.index.Locate(s.cur); s.chunkIndex != ci {
			s.chunkIndex = ci
			s.chunk, s.err = parseChunk(s.reader, s.index.chunkOffsets[ci])
		}
	}

	return s.err == nil
}

// Record returns the record under the current cursor.
func (s *Scanner) Record() []byte {
	_, ri := s.index.Locate(s.cur)
	return s.chunk.records[ri]
}

// Error returns the error that stopped Scan.
func (s *Scanner) Error() error {
	return s.err
}