chunk.go 4.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
package recordio

import (
	"bytes"
	"compress/gzip"
	"encoding/binary"
	"fmt"
	"hash/crc32"
	"io"

	"github.com/golang/snappy"
)

// A Chunk contains the Header and optionally compressed records.  To
// create a chunk, just use ch := &Chunk{}.
type Chunk struct {
	records  [][]byte
	numBytes int // sum of record lengths.
}

func (ch *Chunk) add(record []byte) {
	ch.records = append(ch.records, record)
	ch.numBytes += len(record)
}

// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
func (ch *Chunk) dump(w io.Writer, compressorIndex int) error {
	// NOTE: don't check ch.numBytes instead, because empty
	// records are allowed.
	if len(ch.records) == 0 {
		return nil
	}

	// Write raw records and their lengths into data buffer.
	var data bytes.Buffer

	for _, r := range ch.records {
		var rs [4]byte
		binary.LittleEndian.PutUint32(rs[:], uint32(len(r)))

		if _, e := data.Write(rs[:]); e != nil {
			return fmt.Errorf("Failed to write record length: %v", e)
		}

		if _, e := data.Write(r); e != nil {
			return fmt.Errorf("Failed to write record: %v", e)
		}
	}

	compressed, e := compressData(&data, compressorIndex)
	if e != nil {
		return e
	}

	// Write chunk header and compressed data.
	hdr := &Header{
		checkSum:       crc32.ChecksumIEEE(compressed.Bytes()),
		compressor:     uint32(compressorIndex),
		compressedSize: uint32(compressed.Len()),
		numRecords:     uint32(len(ch.records)),
	}

	if _, e := hdr.write(w); e != nil {
		return fmt.Errorf("Failed to write chunk header: %v", e)
	}

	if _, e := w.Write(compressed.Bytes()); e != nil {
		return fmt.Errorf("Failed to write chunk data: %v", e)
	}

	// Clear the current chunk.
	ch.records = nil
	ch.numBytes = 0

	return nil
}

type noopCompressor struct {
	*bytes.Buffer
}

func (c *noopCompressor) Close() error {
	return nil
}

func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
	compressed := new(bytes.Buffer)
	var compressor io.WriteCloser

	switch compressorIndex {
	case NoCompression:
		compressor = &noopCompressor{compressed}
	case Snappy:
		compressor = snappy.NewBufferedWriter(compressed)
	case Gzip:
		compressor = gzip.NewWriter(compressed)
	default:
		return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
	}

	if _, e := io.Copy(compressor, src); e != nil {
		return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
	}
	compressor.Close()

	return compressed, nil
}

// parse the specified chunk from r.
func parseChunk(r io.ReadSeeker, chunkOffset int64) (*Chunk, error) {
	var e error
	var hdr *Header

	if _, e = r.Seek(chunkOffset, io.SeekStart); e != nil {
		return nil, fmt.Errorf("Failed to seek chunk: %v", e)
	}

	hdr, e = parseHeader(r)
	if e != nil {
		return nil, fmt.Errorf("Failed to parse chunk header: %v", e)
	}

	var buf bytes.Buffer
	if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil {
		return nil, fmt.Errorf("Failed to read chunk data: %v", e)
	}

	if hdr.checkSum != crc32.ChecksumIEEE(buf.Bytes()) {
		return nil, fmt.Errorf("Checksum checking failed.")
	}

	deflated, e := deflateData(&buf, int(hdr.compressor))
	if e != nil {
		return nil, e
	}

	ch := &Chunk{}
	for i := 0; i < int(hdr.numRecords); i++ {
		var rs [4]byte
		if _, e = deflated.Read(rs[:]); e != nil {
			return nil, fmt.Errorf("Failed to read record length: %v", e)
		}

		r := make([]byte, binary.LittleEndian.Uint32(rs[:]))
		if _, e = deflated.Read(r); e != nil {
			return nil, fmt.Errorf("Failed to read a record: %v", e)
		}

		ch.records = append(ch.records, r)
		ch.numBytes += len(r)
	}

	return ch, nil
}

func deflateData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
	var e error
	var deflator io.Reader

	switch compressorIndex {
	case NoCompression:
		deflator = src
	case Snappy:
		deflator = snappy.NewReader(src)
	case Gzip:
		deflator, e = gzip.NewReader(src)
		if e != nil {
			return nil, fmt.Errorf("Failed to create gzip reader: %v", e)
		}
	default:
		return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
	}

	deflated := new(bytes.Buffer)
	if _, e = io.Copy(deflated, deflator); e != nil {
		return nil, fmt.Errorf("Failed to deflate chunk data: %v", e)
	}

	return deflated, nil
}