提交 2fa274cf 编写于 作者: H Helin Wang

fix according to comments

上级 ec5db380
...@@ -25,7 +25,7 @@ type writer struct { ...@@ -25,7 +25,7 @@ type writer struct {
} }
type reader struct { type reader struct {
scanner *recordio.MultiScanner scanner *recordio.Scanner
} }
func cArrayToSlice(p unsafe.Pointer, len int) []byte { func cArrayToSlice(p unsafe.Pointer, len int) []byte {
...@@ -55,21 +55,21 @@ func create_recordio_writer(path *C.char) C.writer { ...@@ -55,21 +55,21 @@ func create_recordio_writer(path *C.char) C.writer {
return addWriter(writer) return addWriter(writer)
} }
//export write_recordio //export recordio_write
func write_recordio(writer C.writer, buf *C.uchar, size C.int) int { func recordio_write(writer C.writer, buf *C.uchar, size C.int) C.int {
w := getWriter(writer) w := getWriter(writer)
b := cArrayToSlice(unsafe.Pointer(buf), int(size)) b := cArrayToSlice(unsafe.Pointer(buf), int(size))
_, err := w.w.Write(b) c, err := w.w.Write(b)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return -1 return -1
} }
return 0 return C.int(c)
} }
//export release_recordio //export release_recordio_writer
func release_recordio(writer C.writer) { func release_recordio_writer(writer C.writer) {
w := removeWriter(writer) w := removeWriter(writer)
w.w.Close() w.w.Close()
w.f.Close() w.f.Close()
...@@ -78,7 +78,7 @@ func release_recordio(writer C.writer) { ...@@ -78,7 +78,7 @@ func release_recordio(writer C.writer) {
//export create_recordio_reader //export create_recordio_reader
func create_recordio_reader(path *C.char) C.reader { func create_recordio_reader(path *C.char) C.reader {
p := C.GoString(path) p := C.GoString(path)
s, err := recordio.NewMultiScanner(strings.Split(p, ",")) s, err := recordio.NewScanner(strings.Split(p, ",")...)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return -1 return -1
...@@ -88,24 +88,23 @@ func create_recordio_reader(path *C.char) C.reader { ...@@ -88,24 +88,23 @@ func create_recordio_reader(path *C.char) C.reader {
return addReader(r) return addReader(r)
} }
//export read_next_item //export recordio_read
func read_next_item(reader C.reader, size *C.int) *C.uchar { func recordio_read(reader C.reader, record **C.uchar) C.int {
r := getReader(reader) r := getReader(reader)
if r.scanner.Scan() { if r.scanner.Scan() {
buf := r.scanner.Record() buf := r.scanner.Record()
*size = C.int(len(buf))
if len(buf) == 0 { if len(buf) == 0 {
return (*C.uchar)(nullPtr) *record = (*C.uchar)(nullPtr)
return 0
} }
ptr := C.malloc(C.size_t(len(buf))) size := C.int(len(buf))
C.memcpy(ptr, unsafe.Pointer(&buf[0]), C.size_t(len(buf))) *record = (*C.uchar)(C.malloc(C.size_t(len(buf))))
return (*C.uchar)(ptr) C.memcpy(unsafe.Pointer(*record), unsafe.Pointer(&buf[0]), C.size_t(len(buf)))
return size
} }
*size = -1 return -1
return (*C.uchar)(nullPtr)
} }
//export release_recordio_reader //export release_recordio_reader
......
...@@ -12,44 +12,43 @@ void fail() { ...@@ -12,44 +12,43 @@ void fail() {
int main() { int main() {
writer w = create_recordio_writer("/tmp/test_recordio_0"); writer w = create_recordio_writer("/tmp/test_recordio_0");
write_recordio(w, "hello", 6); recordio_write(w, "hello", 6);
write_recordio(w, "hi", 3); recordio_write(w, "hi", 3);
release_recordio(w); release_recordio_writer(w);
w = create_recordio_writer("/tmp/test_recordio_1"); w = create_recordio_writer("/tmp/test_recordio_1");
write_recordio(w, "dog", 4); recordio_write(w, "dog", 4);
write_recordio(w, "cat", 4); recordio_write(w, "cat", 4);
release_recordio(w); release_recordio_writer(w);
reader r = create_recordio_reader("/tmp/test_recordio_*"); reader r = create_recordio_reader("/tmp/test_recordio_*");
int size; unsigned char* item = NULL;
unsigned char* item = read_next_item(r, &size); int size = recordio_read(r, &item);
if (strcmp(item, "hello") || size != 6) { if (strcmp(item, "hello") || size != 6) {
fail(); fail();
} }
free(item); free(item);
item = read_next_item(r, &size); size = recordio_read(r, &item);
if (strcmp(item, "hi") || size != 3) { if (strcmp(item, "hi") || size != 3) {
fail(); fail();
} }
free(item); free(item);
item = read_next_item(r, &size); size = recordio_read(r, &item);
if (strcmp(item, "dog") || size != 4) { if (strcmp(item, "dog") || size != 4) {
fail(); fail();
} }
free(item); free(item);
item = read_next_item(r, &size); size = recordio_read(r, &item);
if (strcmp(item, "cat") || size != 4) { if (strcmp(item, "cat") || size != 4) {
fail(); fail();
} }
free(item); free(item);
item = read_next_item(r, &size); size = recordio_read(r, &item);
if (item != NULL || size != -1) { if (size != -1) {
fail(); fail();
} }
......
...@@ -74,8 +74,8 @@ func (r *Index) Locate(recordIndex int) (int, int) { ...@@ -74,8 +74,8 @@ func (r *Index) Locate(recordIndex int) (int, int) {
return -1, -1 return -1, -1
} }
// Scanner scans records in a specified range within [0, numRecords). // RangeScanner scans records in a specified range within [0, numRecords).
type Scanner struct { type RangeScanner struct {
reader io.ReadSeeker reader io.ReadSeeker
index *Index index *Index
start, end, cur int start, end, cur int
...@@ -84,10 +84,10 @@ type Scanner struct { ...@@ -84,10 +84,10 @@ type Scanner struct {
err error err error
} }
// NewScanner creates a scanner that sequencially reads records in the // NewRangeScanner creates a scanner that sequencially reads records in the
// range [start, start+len). If start < 0, it scans from the // range [start, start+len). If start < 0, it scans from the
// beginning. If len < 0, it scans till the end of file. // beginning. If len < 0, it scans till the end of file.
func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner { func NewRangeScanner(r io.ReadSeeker, index *Index, start, len int) *RangeScanner {
if start < 0 { if start < 0 {
start = 0 start = 0
} }
...@@ -95,7 +95,7 @@ func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner { ...@@ -95,7 +95,7 @@ func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner {
len = index.NumRecords() - start len = index.NumRecords() - start
} }
return &Scanner{ return &RangeScanner{
reader: r, reader: r,
index: index, index: index,
start: start, start: start,
...@@ -108,7 +108,7 @@ func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner { ...@@ -108,7 +108,7 @@ func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner {
// Scan moves the cursor forward for one record and loads the chunk // Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet. // containing the record if not yet.
func (s *Scanner) Scan() bool { func (s *RangeScanner) Scan() bool {
s.cur++ s.cur++
if s.cur >= s.end { if s.cur >= s.end {
...@@ -124,14 +124,14 @@ func (s *Scanner) Scan() bool { ...@@ -124,14 +124,14 @@ func (s *Scanner) Scan() bool {
} }
// Record returns the record under the current cursor. // Record returns the record under the current cursor.
func (s *Scanner) Record() []byte { func (s *RangeScanner) Record() []byte {
_, ri := s.index.Locate(s.cur) _, ri := s.index.Locate(s.cur)
return s.chunk.records[ri] return s.chunk.records[ri]
} }
// Err returns the first non-EOF error that was encountered by the // Err returns the first non-EOF error that was encountered by the
// Scanner. // Scanner.
func (s *Scanner) Err() error { func (s *RangeScanner) Err() error {
if s.err == io.EOF { if s.err == io.EOF {
return nil return nil
} }
......
...@@ -68,7 +68,7 @@ func TestWriteAndRead(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestWriteAndRead(t *testing.T) {
2*4)}, // two record legnths 2*4)}, // two record legnths
idx.chunkOffsets) idx.chunkOffsets)
s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) s := NewRangeScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0 i := 0
for s.Scan() { for s.Scan() {
assert.Equal(data[i], string(s.Record())) assert.Equal(data[i], string(s.Record()))
......
...@@ -29,7 +29,7 @@ func TestWriteRead(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestWriteRead(t *testing.T) {
t.Fatal("num record does not match:", idx.NumRecords(), total) t.Fatal("num record does not match:", idx.NumRecords(), total)
} }
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) s := recordio.NewRangeScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0 i := 0
for s.Scan() { for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) { if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
...@@ -66,7 +66,7 @@ func TestChunkIndex(t *testing.T) { ...@@ -66,7 +66,7 @@ func TestChunkIndex(t *testing.T) {
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
newIdx := idx.ChunkIndex(i) newIdx := idx.ChunkIndex(i)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1) s := recordio.NewRangeScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1)
j := 0 j := 0
for s.Scan() { for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) { if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
......
...@@ -6,18 +6,18 @@ import ( ...@@ -6,18 +6,18 @@ import (
"path/filepath" "path/filepath"
) )
// MultiScanner is a scanner for multiple recordio files. // Scanner is a scanner for multiple recordio files.
type MultiScanner struct { type Scanner struct {
paths []string paths []string
curFile *os.File curFile *os.File
curScanner *Scanner curScanner *RangeScanner
pathIdx int pathIdx int
end bool end bool
err error err error
} }
// NewMultiScanner creates a new MultiScanner. // NewScanner creates a new Scanner.
func NewMultiScanner(paths []string) (*MultiScanner, error) { func NewScanner(paths ...string) (*Scanner, error) {
var ps []string var ps []string
for _, s := range paths { for _, s := range paths {
match, err := filepath.Glob(s) match, err := filepath.Glob(s)
...@@ -32,12 +32,12 @@ func NewMultiScanner(paths []string) (*MultiScanner, error) { ...@@ -32,12 +32,12 @@ func NewMultiScanner(paths []string) (*MultiScanner, error) {
return nil, fmt.Errorf("no valid path provided: %v", paths) return nil, fmt.Errorf("no valid path provided: %v", paths)
} }
return &MultiScanner{paths: ps}, nil return &Scanner{paths: ps}, nil
} }
// Scan moves the cursor forward for one record and loads the chunk // Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet. // containing the record if not yet.
func (s *MultiScanner) Scan() bool { func (s *Scanner) Scan() bool {
if s.err != nil { if s.err != nil {
return false return false
} }
...@@ -92,12 +92,12 @@ func (s *MultiScanner) Scan() bool { ...@@ -92,12 +92,12 @@ func (s *MultiScanner) Scan() bool {
// Err returns the first non-EOF error that was encountered by the // Err returns the first non-EOF error that was encountered by the
// Scanner. // Scanner.
func (s *MultiScanner) Err() error { func (s *Scanner) Err() error {
return s.err return s.err
} }
// Record returns the record under the current cursor. // Record returns the record under the current cursor.
func (s *MultiScanner) Record() []byte { func (s *Scanner) Record() []byte {
if s.curScanner == nil { if s.curScanner == nil {
return nil return nil
} }
...@@ -106,7 +106,7 @@ func (s *MultiScanner) Record() []byte { ...@@ -106,7 +106,7 @@ func (s *MultiScanner) Record() []byte {
} }
// Close release the resources. // Close release the resources.
func (s *MultiScanner) Close() error { func (s *Scanner) Close() error {
s.curScanner = nil s.curScanner = nil
if s.curFile != nil { if s.curFile != nil {
err := s.curFile.Close() err := s.curFile.Close()
...@@ -116,7 +116,7 @@ func (s *MultiScanner) Close() error { ...@@ -116,7 +116,7 @@ func (s *MultiScanner) Close() error {
return nil return nil
} }
func (s *MultiScanner) nextFile() (bool, error) { func (s *Scanner) nextFile() (bool, error) {
if s.pathIdx >= len(s.paths) { if s.pathIdx >= len(s.paths) {
return false, nil return false, nil
} }
...@@ -135,6 +135,6 @@ func (s *MultiScanner) nextFile() (bool, error) { ...@@ -135,6 +135,6 @@ func (s *MultiScanner) nextFile() (bool, error) {
} }
s.curFile = f s.curFile = f
s.curScanner = NewScanner(f, idx, 0, -1) s.curScanner = NewRangeScanner(f, idx, 0, -1)
return true, nil return true, nil
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册