From d0f1890db33152a816d2f52c1855af84d704c51f Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Sat, 3 Jun 2017 00:00:06 +0000 Subject: [PATCH] move recordio to github.com/PaddlePaddle/recordio --- go/cmd/master/master.go | 2 +- go/master/service.go | 2 +- go/recordio/README.md | 39 ------ go/recordio/c/CMakeLists.txt | 13 -- go/recordio/c/crecordio.go | 116 ----------------- go/recordio/c/register.go | 61 --------- go/recordio/c/test/CMakeLists.txt | 8 -- go/recordio/c/test/test.c | 56 -------- go/recordio/chunk.go | 181 -------------------------- go/recordio/header.go | 59 --------- go/recordio/range_scanner.go | 140 -------------------- go/recordio/recordio_internal_test.go | 90 ------------- go/recordio/recordio_test.go | 81 ------------ go/recordio/scanner.go | 140 -------------------- go/recordio/writer.go | 60 --------- 15 files changed, 2 insertions(+), 1046 deletions(-) delete mode 100644 go/recordio/README.md delete mode 100644 go/recordio/c/CMakeLists.txt delete mode 100644 go/recordio/c/crecordio.go delete mode 100644 go/recordio/c/register.go delete mode 100644 go/recordio/c/test/CMakeLists.txt delete mode 100644 go/recordio/c/test/test.c delete mode 100644 go/recordio/chunk.go delete mode 100644 go/recordio/header.go delete mode 100644 go/recordio/range_scanner.go delete mode 100644 go/recordio/recordio_internal_test.go delete mode 100644 go/recordio/recordio_test.go delete mode 100644 go/recordio/scanner.go delete mode 100644 go/recordio/writer.go diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index cc6e45049a3..d1f3d7d76c4 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -14,7 +14,7 @@ import ( "github.com/namsral/flag" "github.com/PaddlePaddle/Paddle/go/master" - "github.com/PaddlePaddle/Paddle/go/recordio" + "github.com/PaddlePaddle/recordio" ) func main() { diff --git a/go/master/service.go b/go/master/service.go index 50e646b01f0..ab17a62f385 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/PaddlePaddle/Paddle/go/recordio" + "github.com/PaddlePaddle/recordio" ) const ( diff --git a/go/recordio/README.md b/go/recordio/README.md deleted file mode 100644 index 50e7e954764..00000000000 --- a/go/recordio/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# RecordIO - -## Write - -```go -f, e := os.Create("a_file.recordio") -w := recordio.NewWriter(f) -w.Write([]byte("Hello")) -w.Write([]byte("World!")) -w.Close() -f.Close() -``` - -## Read - -1. Load chunk index: - - ```go - f, e := os.Open("a_file.recordio") - idx, e := recordio.LoadIndex(f) - fmt.Println("Total records: ", idx.Len()) - f.Close() - ``` - -2. Create one or more scanner to read a range of records. The - following example reads the range - [1, 3), i.e., the second and the third records: - - ```go - f, e := os.Open("a_file.recordio") - s := recrodio.NewScanner(f, idx, 1, 3) - for s.Scan() { - fmt.Println(string(s.Record())) - } - if s.Err() != nil { - log.Fatalf("Something wrong with scanning: %v", e) - } - f.Close() - ``` diff --git a/go/recordio/c/CMakeLists.txt b/go/recordio/c/CMakeLists.txt deleted file mode 100644 index c300c091f87..00000000000 --- a/go/recordio/c/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -cmake_minimum_required(VERSION 3.0) - -get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") - -project(cxx_go C Go) - -include(golang) -include(flags) - -go_library(recordio STATIC) -add_subdirectory(test) diff --git a/go/recordio/c/crecordio.go b/go/recordio/c/crecordio.go deleted file mode 100644 index e5cc3029928..00000000000 --- a/go/recordio/c/crecordio.go +++ /dev/null @@ -1,116 +0,0 @@ -package main - -/* -#include - -typedef int reader; -typedef int writer; -*/ -import "C" - -import ( - "log" - "os" - "strings" - "unsafe" - - "github.com/PaddlePaddle/Paddle/go/recordio" -) - -var nullPtr = unsafe.Pointer(uintptr(0)) - -type writer struct { - w *recordio.Writer - f *os.File -} - -type reader struct { - scanner *recordio.Scanner -} - -func cArrayToSlice(p unsafe.Pointer, len int) []byte { - if p == nullPtr { - return nil - } - - // create a Go clice backed by a C array, reference: - // https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices - // - // Go garbage collector will not interact with this data, need - // to be freed properly. - return (*[1 << 30]byte)(p)[:len:len] -} - -//export create_recordio_writer -func create_recordio_writer(path *C.char) C.writer { - p := C.GoString(path) - f, err := os.Create(p) - if err != nil { - log.Println(err) - return -1 - } - - w := recordio.NewWriter(f, -1, -1) - writer := &writer{f: f, w: w} - return addWriter(writer) -} - -//export recordio_write -func recordio_write(writer C.writer, buf *C.uchar, size C.int) C.int { - w := getWriter(writer) - b := cArrayToSlice(unsafe.Pointer(buf), int(size)) - c, err := w.w.Write(b) - if err != nil { - log.Println(err) - return -1 - } - - return C.int(c) -} - -//export release_recordio_writer -func release_recordio_writer(writer C.writer) { - w := removeWriter(writer) - w.w.Close() - w.f.Close() -} - -//export create_recordio_reader -func create_recordio_reader(path *C.char) C.reader { - p := C.GoString(path) - s, err := recordio.NewScanner(strings.Split(p, ",")...) - if err != nil { - log.Println(err) - return -1 - } - - r := &reader{scanner: s} - return addReader(r) -} - -//export recordio_read -func recordio_read(reader C.reader, record **C.uchar) C.int { - r := getReader(reader) - if r.scanner.Scan() { - buf := r.scanner.Record() - if len(buf) == 0 { - *record = (*C.uchar)(nullPtr) - return 0 - } - - size := C.int(len(buf)) - *record = (*C.uchar)(C.malloc(C.size_t(len(buf)))) - C.memcpy(unsafe.Pointer(*record), unsafe.Pointer(&buf[0]), C.size_t(len(buf))) - return size - } - - return -1 -} - -//export release_recordio_reader -func release_recordio_reader(reader C.reader) { - r := removeReader(reader) - r.scanner.Close() -} - -func main() {} // Required but ignored diff --git a/go/recordio/c/register.go b/go/recordio/c/register.go deleted file mode 100644 index 61dfdbd4ab6..00000000000 --- a/go/recordio/c/register.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -/* -typedef int reader; -typedef int writer; -*/ -import "C" - -import "sync" - -var mu sync.Mutex -var handleMap = make(map[C.reader]*reader) -var curHandle C.reader -var writerMap = make(map[C.writer]*writer) -var curWriterHandle C.writer - -func addReader(r *reader) C.reader { - mu.Lock() - defer mu.Unlock() - reader := curHandle - curHandle++ - handleMap[reader] = r - return reader -} - -func getReader(reader C.reader) *reader { - mu.Lock() - defer mu.Unlock() - return handleMap[reader] -} - -func removeReader(reader C.reader) *reader { - mu.Lock() - defer mu.Unlock() - r := handleMap[reader] - delete(handleMap, reader) - return r -} - -func addWriter(w *writer) C.writer { - mu.Lock() - defer mu.Unlock() - writer := curWriterHandle - curWriterHandle++ - writerMap[writer] = w - return writer -} - -func getWriter(writer C.writer) *writer { - mu.Lock() - defer mu.Unlock() - return writerMap[writer] -} - -func removeWriter(writer C.writer) *writer { - mu.Lock() - defer mu.Unlock() - w := writerMap[writer] - delete(writerMap, writer) - return w -} diff --git a/go/recordio/c/test/CMakeLists.txt b/go/recordio/c/test/CMakeLists.txt deleted file mode 100644 index bac1006ae12..00000000000 --- a/go/recordio/c/test/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -cmake_minimum_required(VERSION 3.0) - -include_directories(${CMAKE_BINARY_DIR}) - -add_executable(recordio_test test.c) -add_dependencies(recordio_test recordio) -set (CMAKE_EXE_LINKER_FLAGS "-pthread") -target_link_libraries(recordio_test ${CMAKE_BINARY_DIR}/librecordio.a) diff --git a/go/recordio/c/test/test.c b/go/recordio/c/test/test.c deleted file mode 100644 index b25536a9d76..00000000000 --- a/go/recordio/c/test/test.c +++ /dev/null @@ -1,56 +0,0 @@ -#include -#include - -#include "librecordio.h" - -void fail() { - // TODO(helin): fix: gtest using cmake is not working, using this - // hacky way for now. - printf("test failed.\n"); - exit(-1); -} - -int main() { - writer w = create_recordio_writer("/tmp/test_recordio_0"); - recordio_write(w, "hello", 6); - recordio_write(w, "hi", 3); - release_recordio_writer(w); - - w = create_recordio_writer("/tmp/test_recordio_1"); - recordio_write(w, "dog", 4); - recordio_write(w, "cat", 4); - release_recordio_writer(w); - - reader r = create_recordio_reader("/tmp/test_recordio_*"); - unsigned char* item = NULL; - int size = recordio_read(r, &item); - if (strcmp(item, "hello") || size != 6) { - fail(); - } - free(item); - - size = recordio_read(r, &item); - if (strcmp(item, "hi") || size != 3) { - fail(); - } - free(item); - - size = recordio_read(r, &item); - if (strcmp(item, "dog") || size != 4) { - fail(); - } - free(item); - - size = recordio_read(r, &item); - if (strcmp(item, "cat") || size != 4) { - fail(); - } - free(item); - - size = recordio_read(r, &item); - if (size != -1) { - fail(); - } - - release_recordio_reader(r); -} diff --git a/go/recordio/chunk.go b/go/recordio/chunk.go deleted file mode 100644 index 4e983ab72bd..00000000000 --- a/go/recordio/chunk.go +++ /dev/null @@ -1,181 +0,0 @@ -package recordio - -import ( - "bytes" - "compress/gzip" - "encoding/binary" - "fmt" - "hash/crc32" - "io" - - "github.com/golang/snappy" -) - -// A Chunk contains the Header and optionally compressed records. To -// create a chunk, just use ch := &Chunk{}. -type Chunk struct { - records [][]byte - numBytes int // sum of record lengths. -} - -func (ch *Chunk) add(record []byte) { - ch.records = append(ch.records, record) - ch.numBytes += len(record) -} - -// dump the chunk into w, and clears the chunk and makes it ready for -// the next add invocation. -func (ch *Chunk) dump(w io.Writer, compressorIndex int) error { - // NOTE: don't check ch.numBytes instead, because empty - // records are allowed. - if len(ch.records) == 0 { - return nil - } - - // Write raw records and their lengths into data buffer. - var data bytes.Buffer - - for _, r := range ch.records { - var rs [4]byte - binary.LittleEndian.PutUint32(rs[:], uint32(len(r))) - - if _, e := data.Write(rs[:]); e != nil { - return fmt.Errorf("Failed to write record length: %v", e) - } - - if _, e := data.Write(r); e != nil { - return fmt.Errorf("Failed to write record: %v", e) - } - } - - compressed, e := compressData(&data, compressorIndex) - if e != nil { - return e - } - - // Write chunk header and compressed data. - hdr := &Header{ - checkSum: crc32.ChecksumIEEE(compressed.Bytes()), - compressor: uint32(compressorIndex), - compressedSize: uint32(compressed.Len()), - numRecords: uint32(len(ch.records)), - } - - if _, e := hdr.write(w); e != nil { - return fmt.Errorf("Failed to write chunk header: %v", e) - } - - if _, e := w.Write(compressed.Bytes()); e != nil { - return fmt.Errorf("Failed to write chunk data: %v", e) - } - - // Clear the current chunk. - ch.records = nil - ch.numBytes = 0 - - return nil -} - -type noopCompressor struct { - *bytes.Buffer -} - -func (c *noopCompressor) Close() error { - return nil -} - -func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) { - compressed := new(bytes.Buffer) - var compressor io.WriteCloser - - switch compressorIndex { - case NoCompression: - compressor = &noopCompressor{compressed} - case Snappy: - compressor = snappy.NewBufferedWriter(compressed) - case Gzip: - compressor = gzip.NewWriter(compressed) - default: - return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex) - } - - if _, e := io.Copy(compressor, src); e != nil { - return nil, fmt.Errorf("Failed to compress chunk data: %v", e) - } - compressor.Close() - - return compressed, nil -} - -// parse the specified chunk from r. -func parseChunk(r io.ReadSeeker, chunkOffset int64) (*Chunk, error) { - var e error - var hdr *Header - - if _, e = r.Seek(chunkOffset, io.SeekStart); e != nil { - return nil, fmt.Errorf("Failed to seek chunk: %v", e) - } - - hdr, e = parseHeader(r) - if e != nil { - return nil, fmt.Errorf("Failed to parse chunk header: %v", e) - } - - var buf bytes.Buffer - if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil { - return nil, fmt.Errorf("Failed to read chunk data: %v", e) - } - - if hdr.checkSum != crc32.ChecksumIEEE(buf.Bytes()) { - return nil, fmt.Errorf("Checksum checking failed.") - } - - deflated, e := deflateData(&buf, int(hdr.compressor)) - if e != nil { - return nil, e - } - - ch := &Chunk{} - for i := 0; i < int(hdr.numRecords); i++ { - var rs [4]byte - if _, e = deflated.Read(rs[:]); e != nil { - return nil, fmt.Errorf("Failed to read record length: %v", e) - } - - r := make([]byte, binary.LittleEndian.Uint32(rs[:])) - if _, e = deflated.Read(r); e != nil { - return nil, fmt.Errorf("Failed to read a record: %v", e) - } - - ch.records = append(ch.records, r) - ch.numBytes += len(r) - } - - return ch, nil -} - -func deflateData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) { - var e error - var deflator io.Reader - - switch compressorIndex { - case NoCompression: - deflator = src - case Snappy: - deflator = snappy.NewReader(src) - case Gzip: - deflator, e = gzip.NewReader(src) - if e != nil { - return nil, fmt.Errorf("Failed to create gzip reader: %v", e) - } - default: - return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex) - } - - deflated := new(bytes.Buffer) - if _, e = io.Copy(deflated, deflator); e != nil { - return nil, fmt.Errorf("Failed to deflate chunk data: %v", e) - } - - return deflated, nil -} diff --git a/go/recordio/header.go b/go/recordio/header.go deleted file mode 100644 index d3aefae3646..00000000000 --- a/go/recordio/header.go +++ /dev/null @@ -1,59 +0,0 @@ -package recordio - -import ( - "encoding/binary" - "fmt" - "io" -) - -const ( - // NoCompression means writing raw chunk data into files. - // With other choices, chunks are compressed before written. - NoCompression = iota - // Snappy had been the default compressing algorithm widely - // used in Google. It compromises between speech and - // compression ratio. - Snappy - // Gzip is a well-known compression algorithm. It is - // recommmended only you are looking for compression ratio. - Gzip - - magicNumber uint32 = 0x01020304 - defaultCompressor = Snappy -) - -// Header is the metadata of Chunk. -type Header struct { - checkSum uint32 - compressor uint32 - compressedSize uint32 - numRecords uint32 -} - -func (c *Header) write(w io.Writer) (int, error) { - var buf [20]byte - binary.LittleEndian.PutUint32(buf[0:4], magicNumber) - binary.LittleEndian.PutUint32(buf[4:8], c.checkSum) - binary.LittleEndian.PutUint32(buf[8:12], c.compressor) - binary.LittleEndian.PutUint32(buf[12:16], c.compressedSize) - binary.LittleEndian.PutUint32(buf[16:20], c.numRecords) - return w.Write(buf[:]) -} - -func parseHeader(r io.Reader) (*Header, error) { - var buf [20]byte - if _, e := r.Read(buf[:]); e != nil { - return nil, e - } - - if v := binary.LittleEndian.Uint32(buf[0:4]); v != magicNumber { - return nil, fmt.Errorf("Failed to parse magic number") - } - - return &Header{ - checkSum: binary.LittleEndian.Uint32(buf[4:8]), - compressor: binary.LittleEndian.Uint32(buf[8:12]), - compressedSize: binary.LittleEndian.Uint32(buf[12:16]), - numRecords: binary.LittleEndian.Uint32(buf[16:20]), - }, nil -} diff --git a/go/recordio/range_scanner.go b/go/recordio/range_scanner.go deleted file mode 100644 index 46e2eee68c7..00000000000 --- a/go/recordio/range_scanner.go +++ /dev/null @@ -1,140 +0,0 @@ -package recordio - -import "io" - -// Index consists offsets and sizes of the consequetive chunks in a RecordIO file. -type Index struct { - chunkOffsets []int64 - chunkLens []uint32 - numRecords int // the number of all records in a file. - chunkRecords []int // the number of records in chunks. -} - -// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len. -func LoadIndex(r io.ReadSeeker) (*Index, error) { - f := &Index{} - offset := int64(0) - var e error - var hdr *Header - - for { - hdr, e = parseHeader(r) - if e != nil { - break - } - - f.chunkOffsets = append(f.chunkOffsets, offset) - f.chunkLens = append(f.chunkLens, hdr.numRecords) - f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords)) - f.numRecords += int(hdr.numRecords) - - offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent) - if e != nil { - break - } - } - - if e == io.EOF { - return f, nil - } - return nil, e -} - -// NumRecords returns the total number of records in a RecordIO file. -func (r *Index) NumRecords() int { - return r.numRecords -} - -// NumChunks returns the total number of chunks in a RecordIO file. -func (r *Index) NumChunks() int { - return len(r.chunkLens) -} - -// ChunkIndex return the Index of i-th Chunk. -func (r *Index) ChunkIndex(i int) *Index { - idx := &Index{} - idx.chunkOffsets = []int64{r.chunkOffsets[i]} - idx.chunkLens = []uint32{r.chunkLens[i]} - idx.chunkRecords = []int{r.chunkRecords[i]} - idx.numRecords = idx.chunkRecords[0] - return idx -} - -// Locate returns the index of chunk that contains the given record, -// and the record index within the chunk. It returns (-1, -1) if the -// record is out of range. -func (r *Index) Locate(recordIndex int) (int, int) { - sum := 0 - for i, l := range r.chunkLens { - sum += int(l) - if recordIndex < sum { - return i, recordIndex - sum + int(l) - } - } - return -1, -1 -} - -// RangeScanner scans records in a specified range within [0, numRecords). -type RangeScanner struct { - reader io.ReadSeeker - index *Index - start, end, cur int - chunkIndex int - chunk *Chunk - err error -} - -// NewRangeScanner creates a scanner that sequencially reads records in the -// range [start, start+len). If start < 0, it scans from the -// beginning. If len < 0, it scans till the end of file. -func NewRangeScanner(r io.ReadSeeker, index *Index, start, len int) *RangeScanner { - if start < 0 { - start = 0 - } - if len < 0 || start+len >= index.NumRecords() { - len = index.NumRecords() - start - } - - return &RangeScanner{ - reader: r, - index: index, - start: start, - end: start + len, - cur: start - 1, // The intial status required by Scan. - chunkIndex: -1, - chunk: &Chunk{}, - } -} - -// Scan moves the cursor forward for one record and loads the chunk -// containing the record if not yet. -func (s *RangeScanner) Scan() bool { - s.cur++ - - if s.cur >= s.end { - s.err = io.EOF - } else { - if ci, _ := s.index.Locate(s.cur); s.chunkIndex != ci { - s.chunkIndex = ci - s.chunk, s.err = parseChunk(s.reader, s.index.chunkOffsets[ci]) - } - } - - return s.err == nil -} - -// Record returns the record under the current cursor. -func (s *RangeScanner) Record() []byte { - _, ri := s.index.Locate(s.cur) - return s.chunk.records[ri] -} - -// Err returns the first non-EOF error that was encountered by the -// Scanner. -func (s *RangeScanner) Err() error { - if s.err == io.EOF { - return nil - } - - return s.err -} diff --git a/go/recordio/recordio_internal_test.go b/go/recordio/recordio_internal_test.go deleted file mode 100644 index 30e317925d8..00000000000 --- a/go/recordio/recordio_internal_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package recordio - -import ( - "bytes" - "testing" - "unsafe" - - "github.com/stretchr/testify/assert" -) - -func TestChunkHead(t *testing.T) { - assert := assert.New(t) - - c := &Header{ - checkSum: 123, - compressor: 456, - compressedSize: 789, - } - - var buf bytes.Buffer - _, e := c.write(&buf) - assert.Nil(e) - - cc, e := parseHeader(&buf) - assert.Nil(e) - assert.Equal(c, cc) -} - -func TestWriteAndRead(t *testing.T) { - assert := assert.New(t) - - data := []string{ - "12345", - "1234", - "12"} - - var buf bytes.Buffer - w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize. - - n, e := w.Write([]byte(data[0])) // not exceed chunk size. - assert.Nil(e) - assert.Equal(5, n) - - n, e = w.Write([]byte(data[1])) // not exceed chunk size. - assert.Nil(e) - assert.Equal(4, n) - - n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk. - assert.Nil(e) - assert.Equal(n, 2) - - assert.Nil(w.Close()) // flush the second chunk. - assert.Nil(w.Writer) - - n, e = w.Write([]byte("anything")) // not effective after close. - assert.NotNil(e) - assert.Equal(n, 0) - - idx, e := LoadIndex(bytes.NewReader(buf.Bytes())) - assert.Nil(e) - assert.Equal([]uint32{2, 1}, idx.chunkLens) - assert.Equal( - []int64{0, - int64(4 + // magic number - unsafe.Sizeof(Header{}) + - 5 + // first record - 4 + // second record - 2*4)}, // two record legnths - idx.chunkOffsets) - - s := NewRangeScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) - i := 0 - for s.Scan() { - assert.Equal(data[i], string(s.Record())) - i++ - } -} - -func TestWriteEmptyFile(t *testing.T) { - assert := assert.New(t) - - var buf bytes.Buffer - w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize. - assert.Nil(w.Close()) - assert.Equal(0, buf.Len()) - - idx, e := LoadIndex(bytes.NewReader(buf.Bytes())) - assert.Nil(e) - assert.Equal(0, idx.NumRecords()) -} diff --git a/go/recordio/recordio_test.go b/go/recordio/recordio_test.go deleted file mode 100644 index e4ef835afa6..00000000000 --- a/go/recordio/recordio_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package recordio_test - -import ( - "bytes" - "reflect" - "testing" - - "github.com/PaddlePaddle/Paddle/go/recordio" -) - -func TestWriteRead(t *testing.T) { - const total = 1000 - var buf bytes.Buffer - w := recordio.NewWriter(&buf, 0, -1) - for i := 0; i < total; i++ { - _, err := w.Write(make([]byte, i)) - if err != nil { - t.Fatal(err) - } - } - w.Close() - - idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes())) - if err != nil { - t.Fatal(err) - } - - if idx.NumRecords() != total { - t.Fatal("num record does not match:", idx.NumRecords(), total) - } - - s := recordio.NewRangeScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) - i := 0 - for s.Scan() { - if !reflect.DeepEqual(s.Record(), make([]byte, i)) { - t.Fatal("not equal:", len(s.Record()), len(make([]byte, i))) - } - i++ - } - - if i != total { - t.Fatal("total count not match:", i, total) - } -} - -func TestChunkIndex(t *testing.T) { - const total = 1000 - var buf bytes.Buffer - w := recordio.NewWriter(&buf, 0, -1) - for i := 0; i < total; i++ { - _, err := w.Write(make([]byte, i)) - if err != nil { - t.Fatal(err) - } - } - w.Close() - - idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes())) - if err != nil { - t.Fatal(err) - } - - if idx.NumChunks() != total { - t.Fatal("unexpected chunk num:", idx.NumChunks(), total) - } - - for i := 0; i < total; i++ { - newIdx := idx.ChunkIndex(i) - s := recordio.NewRangeScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1) - j := 0 - for s.Scan() { - if !reflect.DeepEqual(s.Record(), make([]byte, i)) { - t.Fatal("not equal:", len(s.Record()), len(make([]byte, i))) - } - j++ - } - if j != 1 { - t.Fatal("unexpected record per chunk:", j) - } - } -} diff --git a/go/recordio/scanner.go b/go/recordio/scanner.go deleted file mode 100644 index 865228ff651..00000000000 --- a/go/recordio/scanner.go +++ /dev/null @@ -1,140 +0,0 @@ -package recordio - -import ( - "fmt" - "os" - "path/filepath" -) - -// Scanner is a scanner for multiple recordio files. -type Scanner struct { - paths []string - curFile *os.File - curScanner *RangeScanner - pathIdx int - end bool - err error -} - -// NewScanner creates a new Scanner. -func NewScanner(paths ...string) (*Scanner, error) { - var ps []string - for _, s := range paths { - match, err := filepath.Glob(s) - if err != nil { - return nil, err - } - - ps = append(ps, match...) - } - - if len(ps) == 0 { - return nil, fmt.Errorf("no valid path provided: %v", paths) - } - - return &Scanner{paths: ps}, nil -} - -// Scan moves the cursor forward for one record and loads the chunk -// containing the record if not yet. -func (s *Scanner) Scan() bool { - if s.err != nil { - return false - } - - if s.end { - return false - } - - if s.curScanner == nil { - more, err := s.nextFile() - if err != nil { - s.err = err - return false - } - - if !more { - s.end = true - return false - } - } - - curMore := s.curScanner.Scan() - s.err = s.curScanner.Err() - - if s.err != nil { - return curMore - } - - if !curMore { - err := s.curFile.Close() - if err != nil { - s.err = err - return false - } - s.curFile = nil - - more, err := s.nextFile() - if err != nil { - s.err = err - return false - } - - if !more { - s.end = true - return false - } - - return s.Scan() - } - return true -} - -// Err returns the first non-EOF error that was encountered by the -// Scanner. -func (s *Scanner) Err() error { - return s.err -} - -// Record returns the record under the current cursor. -func (s *Scanner) Record() []byte { - if s.curScanner == nil { - return nil - } - - return s.curScanner.Record() -} - -// Close release the resources. -func (s *Scanner) Close() error { - s.curScanner = nil - if s.curFile != nil { - err := s.curFile.Close() - s.curFile = nil - return err - } - return nil -} - -func (s *Scanner) nextFile() (bool, error) { - if s.pathIdx >= len(s.paths) { - return false, nil - } - - path := s.paths[s.pathIdx] - s.pathIdx++ - f, err := os.Open(path) - if err != nil { - return false, err - } - - idx, err := LoadIndex(f) - if err != nil { - f.Close() - return false, err - } - - s.curFile = f - s.curScanner = NewRangeScanner(f, idx, 0, -1) - return true, nil -} diff --git a/go/recordio/writer.go b/go/recordio/writer.go deleted file mode 100644 index 39112e518fb..00000000000 --- a/go/recordio/writer.go +++ /dev/null @@ -1,60 +0,0 @@ -package recordio - -import ( - "fmt" - "io" -) - -const ( - defaultMaxChunkSize = 32 * 1024 * 1024 -) - -// Writer creates a RecordIO file. -type Writer struct { - io.Writer // Set to nil to mark a closed writer. - chunk *Chunk - maxChunkSize int // total records size, excluding metadata, before compression. - compressor int -} - -// NewWriter creates a RecordIO file writer. Each chunk is compressed -// using the deflate algorithm given compression level. Note that -// level 0 means no compression and -1 means default compression. -func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer { - if maxChunkSize < 0 { - maxChunkSize = defaultMaxChunkSize - } - - if compressor < 0 { - compressor = defaultCompressor - } - - return &Writer{ - Writer: w, - chunk: &Chunk{}, - maxChunkSize: maxChunkSize, - compressor: compressor} -} - -// Writes a record. It returns an error if Close has been called. -func (w *Writer) Write(record []byte) (int, error) { - if w.Writer == nil { - return 0, fmt.Errorf("Cannot write since writer had been closed") - } - - if w.chunk.numBytes+len(record) > w.maxChunkSize { - if e := w.chunk.dump(w.Writer, w.compressor); e != nil { - return 0, e - } - } - - w.chunk.add(record) - return len(record), nil -} - -// Close flushes the current chunk and makes the writer invalid. -func (w *Writer) Close() error { - e := w.chunk.dump(w.Writer, w.compressor) - w.Writer = nil - return e -} -- GitLab