diff --git a/.travis.yml b/.travis.yml index 387367a2305e7bf582e29538ab9e51571b9ae75b..44b755ee32d204c883f0d74e7ad0f78380918954 100644 --- a/.travis.yml +++ b/.travis.yml @@ -49,6 +49,7 @@ before_install: # Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python # protobuf version. - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker + - pip install rarfile - | function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } script: diff --git a/Dockerfile b/Dockerfile index 571c3e1476e1f3cb0750b6fffa74ee1625a7d19d..b6f99ca539d077164c71d797a5ccda7b1b5c44ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,7 +56,8 @@ RUN pip install --upgrade pip && \ pip install -U docopt PyYAML sphinx && \ pip install -U sphinx-rtd-theme==0.1.9 recommonmark && \ pip install pre-commit 'requests==2.9.2' 'ipython==5.3.0' && \ - pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' + pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \ + pip install rarfile # To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use # the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2 diff --git a/demo/seqToseq/api_train_v2.py b/demo/seqToseq/api_train_v2.py index 3072c375123a2713c655b09fb28001960c9ab64d..bb535f09260613098681db212ffc91631acf67e2 100644 --- a/demo/seqToseq/api_train_v2.py +++ b/demo/seqToseq/api_train_v2.py @@ -21,9 +21,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): size=word_vector_dim, param_attr=paddle.attr.ParamAttr(name='_source_language_embedding')) src_forward = paddle.networks.simple_gru( - input=src_embedding, size=encoder_size) + name='src_forward_gru', input=src_embedding, size=encoder_size) src_backward = paddle.networks.simple_gru( - input=src_embedding, size=encoder_size, reverse=True) + name='src_backward_gru', + input=src_embedding, + size=encoder_size, + reverse=True) encoded_vector = paddle.layer.concat(input=[src_forward, src_backward]) #### Decoder @@ -34,7 +37,9 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): backward_first = paddle.layer.first_seq(input=src_backward) with paddle.layer.mixed( - size=decoder_size, act=paddle.activation.Tanh()) as decoder_boot: + name="decoder_boot_mixed", + size=decoder_size, + act=paddle.activation.Tanh()) as decoder_boot: decoder_boot += paddle.layer.full_matrix_projection( input=backward_first) @@ -44,11 +49,17 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): name='gru_decoder', size=decoder_size, boot_layer=decoder_boot) context = paddle.networks.simple_attention( + name="simple_attention", encoded_sequence=enc_vec, encoded_proj=enc_proj, decoder_state=decoder_mem) - with paddle.layer.mixed(size=decoder_size * 3) as decoder_inputs: + with paddle.layer.mixed( + name="input_recurrent", + size=decoder_size * 3, + # enable error clipping + layer_attr=paddle.attr.ExtraAttr( + error_clipping_threshold=100.0)) as decoder_inputs: decoder_inputs += paddle.layer.full_matrix_projection(input=context) decoder_inputs += paddle.layer.full_matrix_projection( input=current_word) @@ -57,9 +68,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): name='gru_decoder', input=decoder_inputs, output_mem=decoder_mem, + # uncomment to enable local threshold for gradient clipping + # param_attr=paddle.attr.ParamAttr(gradient_clipping_threshold=9.9), size=decoder_size) with paddle.layer.mixed( + name="gru_step_output", size=target_dict_dim, bias_attr=True, act=paddle.activation.Softmax()) as out: @@ -125,7 +139,13 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): def main(): - paddle.init(use_gpu=False, trainer_count=1) + paddle.init( + use_gpu=False, + trainer_count=1, + # log gradient clipping info + log_clipping=True, + # log error clipping info + log_error_clipping=True) is_generating = False # source and target dict dim. @@ -140,6 +160,8 @@ def main(): # define optimize method and trainer optimizer = paddle.optimizer.Adam( learning_rate=5e-5, + # uncomment to enable global threshold for gradient clipping + # gradient_clipping_threshold=10.0, regularization=paddle.optimizer.L2Regularization(rate=8e-4)) trainer = paddle.trainer.SGD(cost=cost, parameters=parameters, diff --git a/paddle/go/CMakeLists.txt b/paddle/go/CMakeLists.txt index 20f1476943346b256822d0d8a164b0891f34bafe..51c5252d66374fbc55abc0e8ede8fccd0f4dead7 100644 --- a/paddle/go/CMakeLists.txt +++ b/paddle/go/CMakeLists.txt @@ -2,8 +2,10 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) go_library(adder SRCS adder.go) -cc_test(cgo_test +if (WITH_TESTING) + cc_test(cgo_test SRCS cgo_test.cc DEPS adder) +endif() diff --git a/paddle/go/cclient/CMakeLists.txt b/paddle/go/cclient/CMakeLists.txt index 29a2089fb10baf24ca4ab77675987c41cbea1c37..c85ff3db09d442a3e51f061993b5f02f3e69e2bb 100644 --- a/paddle/go/cclient/CMakeLists.txt +++ b/paddle/go/cclient/CMakeLists.txt @@ -2,12 +2,9 @@ cmake_minimum_required(VERSION 3.0) if(GTEST_INCLUDE_DIR AND GTEST_LIBRARIES) message("-- Found gtest (include: ${GTEST_INCLUDE_DIR}, library: ${GTEST_LIBRARIES})") -else() - # find #include - get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) - include_directories(${PARENT_DIR}) - +else() # find cmake directory modules + get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) diff --git a/paddle/go/cclient/test/CMakeLists.txt b/paddle/go/cclient/test/CMakeLists.txt index c899bd275d37bc28d4aefb2476310581a57e5bb4..de7ef6a47ac881daaedf461e337e61c74b927884 100644 --- a/paddle/go/cclient/test/CMakeLists.txt +++ b/paddle/go/cclient/test/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required(VERSION 3.0) -include_directories(/env/gopath/src/github.com/PaddlePaddle/Paddle/paddle/go/cclient/build/) +include_directories(${CMAKE_BINARY_DIR}) add_executable(main main.c) add_dependencies(main client) set (CMAKE_EXE_LINKER_FLAGS "-pthread") -target_link_libraries(main /env/gopath/src/github.com/PaddlePaddle/Paddle/paddle/go/cclient/build/libclient.a) # ${GTEST_LIBRARIES}) +target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a) diff --git a/paddle/go/recordio/README.md b/paddle/go/recordio/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8b0b9308b1ade3560d6bda150ea0139a9fb2503b --- /dev/null +++ b/paddle/go/recordio/README.md @@ -0,0 +1,36 @@ +# RecordIO + +## Write + +```go +f, e := os.Create("a_file.recordio") +w := recordio.NewWriter(f) +w.Write([]byte("Hello")) +w.Write([]byte("World!")) +w.Close() +``` + +## Read + +1. Load chunk index: + + ```go + f, e := os.Open("a_file.recordio") + idx, e := recordio.LoadIndex(f) + fmt.Println("Total records: ", idx.Len()) + ``` + +2. Create one or more scanner to read a range of records. The + following example reads the range + [1, 3), i.e., the second and the third records: + + ```go + f, e := os.Open("a_file.recordio") + s := recrodio.NewScanner(f, idx, 1, 3) + for s.Scan() { + fmt.Println(string(s.Record())) + } + if s.Err() != nil && s.Err() != io.EOF { + log.Fatalf("Something wrong with scanning: %v", e) + } + ``` diff --git a/paddle/go/recordio/chunk.go b/paddle/go/recordio/chunk.go new file mode 100644 index 0000000000000000000000000000000000000000..4e983ab72bddbfec23929e1874142bc7673b317b --- /dev/null +++ b/paddle/go/recordio/chunk.go @@ -0,0 +1,181 @@ +package recordio + +import ( + "bytes" + "compress/gzip" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + + "github.com/golang/snappy" +) + +// A Chunk contains the Header and optionally compressed records. To +// create a chunk, just use ch := &Chunk{}. +type Chunk struct { + records [][]byte + numBytes int // sum of record lengths. +} + +func (ch *Chunk) add(record []byte) { + ch.records = append(ch.records, record) + ch.numBytes += len(record) +} + +// dump the chunk into w, and clears the chunk and makes it ready for +// the next add invocation. +func (ch *Chunk) dump(w io.Writer, compressorIndex int) error { + // NOTE: don't check ch.numBytes instead, because empty + // records are allowed. + if len(ch.records) == 0 { + return nil + } + + // Write raw records and their lengths into data buffer. + var data bytes.Buffer + + for _, r := range ch.records { + var rs [4]byte + binary.LittleEndian.PutUint32(rs[:], uint32(len(r))) + + if _, e := data.Write(rs[:]); e != nil { + return fmt.Errorf("Failed to write record length: %v", e) + } + + if _, e := data.Write(r); e != nil { + return fmt.Errorf("Failed to write record: %v", e) + } + } + + compressed, e := compressData(&data, compressorIndex) + if e != nil { + return e + } + + // Write chunk header and compressed data. + hdr := &Header{ + checkSum: crc32.ChecksumIEEE(compressed.Bytes()), + compressor: uint32(compressorIndex), + compressedSize: uint32(compressed.Len()), + numRecords: uint32(len(ch.records)), + } + + if _, e := hdr.write(w); e != nil { + return fmt.Errorf("Failed to write chunk header: %v", e) + } + + if _, e := w.Write(compressed.Bytes()); e != nil { + return fmt.Errorf("Failed to write chunk data: %v", e) + } + + // Clear the current chunk. + ch.records = nil + ch.numBytes = 0 + + return nil +} + +type noopCompressor struct { + *bytes.Buffer +} + +func (c *noopCompressor) Close() error { + return nil +} + +func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) { + compressed := new(bytes.Buffer) + var compressor io.WriteCloser + + switch compressorIndex { + case NoCompression: + compressor = &noopCompressor{compressed} + case Snappy: + compressor = snappy.NewBufferedWriter(compressed) + case Gzip: + compressor = gzip.NewWriter(compressed) + default: + return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex) + } + + if _, e := io.Copy(compressor, src); e != nil { + return nil, fmt.Errorf("Failed to compress chunk data: %v", e) + } + compressor.Close() + + return compressed, nil +} + +// parse the specified chunk from r. +func parseChunk(r io.ReadSeeker, chunkOffset int64) (*Chunk, error) { + var e error + var hdr *Header + + if _, e = r.Seek(chunkOffset, io.SeekStart); e != nil { + return nil, fmt.Errorf("Failed to seek chunk: %v", e) + } + + hdr, e = parseHeader(r) + if e != nil { + return nil, fmt.Errorf("Failed to parse chunk header: %v", e) + } + + var buf bytes.Buffer + if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil { + return nil, fmt.Errorf("Failed to read chunk data: %v", e) + } + + if hdr.checkSum != crc32.ChecksumIEEE(buf.Bytes()) { + return nil, fmt.Errorf("Checksum checking failed.") + } + + deflated, e := deflateData(&buf, int(hdr.compressor)) + if e != nil { + return nil, e + } + + ch := &Chunk{} + for i := 0; i < int(hdr.numRecords); i++ { + var rs [4]byte + if _, e = deflated.Read(rs[:]); e != nil { + return nil, fmt.Errorf("Failed to read record length: %v", e) + } + + r := make([]byte, binary.LittleEndian.Uint32(rs[:])) + if _, e = deflated.Read(r); e != nil { + return nil, fmt.Errorf("Failed to read a record: %v", e) + } + + ch.records = append(ch.records, r) + ch.numBytes += len(r) + } + + return ch, nil +} + +func deflateData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) { + var e error + var deflator io.Reader + + switch compressorIndex { + case NoCompression: + deflator = src + case Snappy: + deflator = snappy.NewReader(src) + case Gzip: + deflator, e = gzip.NewReader(src) + if e != nil { + return nil, fmt.Errorf("Failed to create gzip reader: %v", e) + } + default: + return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex) + } + + deflated := new(bytes.Buffer) + if _, e = io.Copy(deflated, deflator); e != nil { + return nil, fmt.Errorf("Failed to deflate chunk data: %v", e) + } + + return deflated, nil +} diff --git a/paddle/go/recordio/header.go b/paddle/go/recordio/header.go new file mode 100644 index 0000000000000000000000000000000000000000..d3aefae3646eb002bd2c31789c7eb182faf02b1f --- /dev/null +++ b/paddle/go/recordio/header.go @@ -0,0 +1,59 @@ +package recordio + +import ( + "encoding/binary" + "fmt" + "io" +) + +const ( + // NoCompression means writing raw chunk data into files. + // With other choices, chunks are compressed before written. + NoCompression = iota + // Snappy had been the default compressing algorithm widely + // used in Google. It compromises between speech and + // compression ratio. + Snappy + // Gzip is a well-known compression algorithm. It is + // recommmended only you are looking for compression ratio. + Gzip + + magicNumber uint32 = 0x01020304 + defaultCompressor = Snappy +) + +// Header is the metadata of Chunk. +type Header struct { + checkSum uint32 + compressor uint32 + compressedSize uint32 + numRecords uint32 +} + +func (c *Header) write(w io.Writer) (int, error) { + var buf [20]byte + binary.LittleEndian.PutUint32(buf[0:4], magicNumber) + binary.LittleEndian.PutUint32(buf[4:8], c.checkSum) + binary.LittleEndian.PutUint32(buf[8:12], c.compressor) + binary.LittleEndian.PutUint32(buf[12:16], c.compressedSize) + binary.LittleEndian.PutUint32(buf[16:20], c.numRecords) + return w.Write(buf[:]) +} + +func parseHeader(r io.Reader) (*Header, error) { + var buf [20]byte + if _, e := r.Read(buf[:]); e != nil { + return nil, e + } + + if v := binary.LittleEndian.Uint32(buf[0:4]); v != magicNumber { + return nil, fmt.Errorf("Failed to parse magic number") + } + + return &Header{ + checkSum: binary.LittleEndian.Uint32(buf[4:8]), + compressor: binary.LittleEndian.Uint32(buf[8:12]), + compressedSize: binary.LittleEndian.Uint32(buf[12:16]), + numRecords: binary.LittleEndian.Uint32(buf[16:20]), + }, nil +} diff --git a/paddle/go/recordio/reader.go b/paddle/go/recordio/reader.go new file mode 100644 index 0000000000000000000000000000000000000000..a12c604f7b2f5c103624aac538034ec6a883c536 --- /dev/null +++ b/paddle/go/recordio/reader.go @@ -0,0 +1,135 @@ +package recordio + +import "io" + +// Index consists offsets and sizes of the consequetive chunks in a RecordIO file. +type Index struct { + chunkOffsets []int64 + chunkLens []uint32 + numRecords int // the number of all records in a file. + chunkRecords []int // the number of records in chunks. +} + +// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len. +func LoadIndex(r io.ReadSeeker) (*Index, error) { + f := &Index{} + offset := int64(0) + var e error + var hdr *Header + + for { + hdr, e = parseHeader(r) + if e != nil { + break + } + + f.chunkOffsets = append(f.chunkOffsets, offset) + f.chunkLens = append(f.chunkLens, hdr.numRecords) + f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords)) + f.numRecords += int(hdr.numRecords) + + offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent) + if e != nil { + break + } + } + + if e == io.EOF { + return f, nil + } + return nil, e +} + +// NumRecords returns the total number of records in a RecordIO file. +func (r *Index) NumRecords() int { + return r.numRecords +} + +// NumChunks returns the total number of chunks in a RecordIO file. +func (r *Index) NumChunks() int { + return len(r.chunkLens) +} + +// ChunkIndex return the Index of i-th Chunk. +func (r *Index) ChunkIndex(i int) *Index { + idx := &Index{} + idx.chunkOffsets = []int64{r.chunkOffsets[i]} + idx.chunkLens = []uint32{r.chunkLens[i]} + idx.chunkRecords = []int{r.chunkRecords[i]} + idx.numRecords = idx.chunkRecords[0] + return idx +} + +// Locate returns the index of chunk that contains the given record, +// and the record index within the chunk. It returns (-1, -1) if the +// record is out of range. +func (r *Index) Locate(recordIndex int) (int, int) { + sum := 0 + for i, l := range r.chunkLens { + sum += int(l) + if recordIndex < sum { + return i, recordIndex - sum + int(l) + } + } + return -1, -1 +} + +// Scanner scans records in a specified range within [0, numRecords). +type Scanner struct { + reader io.ReadSeeker + index *Index + start, end, cur int + chunkIndex int + chunk *Chunk + err error +} + +// NewScanner creates a scanner that sequencially reads records in the +// range [start, start+len). If start < 0, it scans from the +// beginning. If len < 0, it scans till the end of file. +func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner { + if start < 0 { + start = 0 + } + if len < 0 || start+len >= index.NumRecords() { + len = index.NumRecords() - start + } + + return &Scanner{ + reader: r, + index: index, + start: start, + end: start + len, + cur: start - 1, // The intial status required by Scan. + chunkIndex: -1, + chunk: &Chunk{}, + } +} + +// Scan moves the cursor forward for one record and loads the chunk +// containing the record if not yet. +func (s *Scanner) Scan() bool { + s.cur++ + + if s.cur >= s.end { + s.err = io.EOF + } else { + if ci, _ := s.index.Locate(s.cur); s.chunkIndex != ci { + s.chunkIndex = ci + s.chunk, s.err = parseChunk(s.reader, s.index.chunkOffsets[ci]) + } + } + + return s.err == nil +} + +// Record returns the record under the current cursor. +func (s *Scanner) Record() []byte { + _, ri := s.index.Locate(s.cur) + return s.chunk.records[ri] +} + +// Error returns the error that stopped Scan. +func (s *Scanner) Error() error { + return s.err +} diff --git a/paddle/go/recordio/recordio_internal_test.go b/paddle/go/recordio/recordio_internal_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e0f7dd0407caaf38e8113660239d1a0c6eb8afa1 --- /dev/null +++ b/paddle/go/recordio/recordio_internal_test.go @@ -0,0 +1,90 @@ +package recordio + +import ( + "bytes" + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +func TestChunkHead(t *testing.T) { + assert := assert.New(t) + + c := &Header{ + checkSum: 123, + compressor: 456, + compressedSize: 789, + } + + var buf bytes.Buffer + _, e := c.write(&buf) + assert.Nil(e) + + cc, e := parseHeader(&buf) + assert.Nil(e) + assert.Equal(c, cc) +} + +func TestWriteAndRead(t *testing.T) { + assert := assert.New(t) + + data := []string{ + "12345", + "1234", + "12"} + + var buf bytes.Buffer + w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize. + + n, e := w.Write([]byte(data[0])) // not exceed chunk size. + assert.Nil(e) + assert.Equal(5, n) + + n, e = w.Write([]byte(data[1])) // not exceed chunk size. + assert.Nil(e) + assert.Equal(4, n) + + n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk. + assert.Nil(e) + assert.Equal(n, 2) + + assert.Nil(w.Close()) // flush the second chunk. + assert.Nil(w.Writer) + + n, e = w.Write([]byte("anything")) // not effective after close. + assert.NotNil(e) + assert.Equal(n, 0) + + idx, e := LoadIndex(bytes.NewReader(buf.Bytes())) + assert.Nil(e) + assert.Equal([]uint32{2, 1}, idx.chunkLens) + assert.Equal( + []int64{0, + int64(4 + // magic number + unsafe.Sizeof(Header{}) + + 5 + // first record + 4 + // second record + 2*4)}, // two record legnths + idx.chunkOffsets) + + s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) + i := 0 + for s.Scan() { + assert.Equal(data[i], string(s.Record())) + i++ + } +} + +func TestWriteEmptyFile(t *testing.T) { + assert := assert.New(t) + + var buf bytes.Buffer + w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize. + assert.Nil(w.Close()) + assert.Equal(0, buf.Len()) + + idx, e := LoadIndex(bytes.NewReader(buf.Bytes())) + assert.Nil(e) + assert.Equal(0, idx.NumRecords()) +} diff --git a/paddle/go/recordio/recordio_test.go b/paddle/go/recordio/recordio_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8bf1b020ab75ca66c12b713526e010756c364217 --- /dev/null +++ b/paddle/go/recordio/recordio_test.go @@ -0,0 +1,81 @@ +package recordio_test + +import ( + "bytes" + "reflect" + "testing" + + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +func TestWriteRead(t *testing.T) { + const total = 1000 + var buf bytes.Buffer + w := recordio.NewWriter(&buf, 0, -1) + for i := 0; i < total; i++ { + _, err := w.Write(make([]byte, i)) + if err != nil { + t.Fatal(err) + } + } + w.Close() + + idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } + + if idx.NumRecords() != total { + t.Fatal("num record does not match:", idx.NumRecords(), total) + } + + s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1) + i := 0 + for s.Scan() { + if !reflect.DeepEqual(s.Record(), make([]byte, i)) { + t.Fatal("not equal:", len(s.Record()), len(make([]byte, i))) + } + i++ + } + + if i != total { + t.Fatal("total count not match:", i, total) + } +} + +func TestChunkIndex(t *testing.T) { + const total = 1000 + var buf bytes.Buffer + w := recordio.NewWriter(&buf, 0, -1) + for i := 0; i < total; i++ { + _, err := w.Write(make([]byte, i)) + if err != nil { + t.Fatal(err) + } + } + w.Close() + + idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } + + if idx.NumChunks() != total { + t.Fatal("unexpected chunk num:", idx.NumChunks(), total) + } + + for i := 0; i < total; i++ { + newIdx := idx.ChunkIndex(i) + s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1) + j := 0 + for s.Scan() { + if !reflect.DeepEqual(s.Record(), make([]byte, i)) { + t.Fatal("not equal:", len(s.Record()), len(make([]byte, i))) + } + j++ + } + if j != 1 { + t.Fatal("unexpected record per chunk:", j) + } + } +} diff --git a/paddle/go/recordio/writer.go b/paddle/go/recordio/writer.go new file mode 100644 index 0000000000000000000000000000000000000000..39112e518fb45c66f4e16733924c14a261134d9d --- /dev/null +++ b/paddle/go/recordio/writer.go @@ -0,0 +1,60 @@ +package recordio + +import ( + "fmt" + "io" +) + +const ( + defaultMaxChunkSize = 32 * 1024 * 1024 +) + +// Writer creates a RecordIO file. +type Writer struct { + io.Writer // Set to nil to mark a closed writer. + chunk *Chunk + maxChunkSize int // total records size, excluding metadata, before compression. + compressor int +} + +// NewWriter creates a RecordIO file writer. Each chunk is compressed +// using the deflate algorithm given compression level. Note that +// level 0 means no compression and -1 means default compression. +func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer { + if maxChunkSize < 0 { + maxChunkSize = defaultMaxChunkSize + } + + if compressor < 0 { + compressor = defaultCompressor + } + + return &Writer{ + Writer: w, + chunk: &Chunk{}, + maxChunkSize: maxChunkSize, + compressor: compressor} +} + +// Writes a record. It returns an error if Close has been called. +func (w *Writer) Write(record []byte) (int, error) { + if w.Writer == nil { + return 0, fmt.Errorf("Cannot write since writer had been closed") + } + + if w.chunk.numBytes+len(record) > w.maxChunkSize { + if e := w.chunk.dump(w.Writer, w.compressor); e != nil { + return 0, e + } + } + + w.chunk.add(record) + return len(record), nil +} + +// Close flushes the current chunk and makes the writer invalid. +func (w *Writer) Close() error { + e := w.chunk.dump(w.Writer, w.compressor) + w.Writer = nil + return e +} diff --git a/paddle/parameter/FirstOrderOptimizer.cpp b/paddle/parameter/FirstOrderOptimizer.cpp index dbb738e98b5874f5bb33026ad585a6c3ef327d1d..5938b2210c7174c9a0ce659220825b74af007db5 100644 --- a/paddle/parameter/FirstOrderOptimizer.cpp +++ b/paddle/parameter/FirstOrderOptimizer.cpp @@ -161,6 +161,7 @@ void AdaDeltaParameterOptimizer::update(const VectorPtr vecs[], const ParameterConfig& config, size_t sparseId) const { CHECK(sparseId == -1LU) << "Sparse update is not supported"; + BaseMatrix& value = *vecs[PARAMETER_VALUE]; BaseMatrix& grad = *vecs[PARAMETER_GRADIENT]; BaseMatrix& mom = *vecs[PARAMETER_MOMENTUM]; @@ -265,6 +266,7 @@ void AdamParameterOptimizer::update(const VectorPtr vecs[], const ParameterConfig& config, size_t sparseId) const { CHECK(sparseId == -1UL) << "Sparse update is not supported"; + real beta1_power = std::pow(beta1_, step_); real beta2_power = std::pow(beta2_, step_); real learningRate = config.learning_rate() * learningRate_; @@ -303,18 +305,25 @@ void AdamaxParameterOptimizer::update(const VectorPtr vecs[], void OptimizerWithGradientClipping::update(const VectorPtr vecs[], const ParameterConfig& config, size_t sparseId) const { + real globalThreshold = optConfig_.gradient_clipping_threshold(); + real localThreshold = config.gradient_clipping_threshold(); + + // Use local gradient clipping threshold if it's enabled, + // otherwise using the global one. + real threshold = localThreshold > 0.0f ? localThreshold : globalThreshold; + std::string field = localThreshold > 0.0f ? "local" : "global"; + real maxAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsMax(); - if (maxAbsGrad > config.gradient_clipping_threshold()) { + if (maxAbsGrad > threshold) { if (FLAGS_log_clipping) { real avgAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsSum() / vecs[PARAMETER_GRADIENT]->getSize(); - LOG(INFO) << "parameter=" << config.name() << " need clipping," - << " max grad=" << maxAbsGrad << " avg grad=" << avgAbsGrad; + LOG(INFO) << "parameter=" << config.name() << " need clipping by " + << field << " threshold=" << threshold + << ", max grad=" << maxAbsGrad << ", avg grad=" << avgAbsGrad; } - vecs[PARAMETER_GRADIENT]->clip(-config.gradient_clipping_threshold(), - config.gradient_clipping_threshold()); + vecs[PARAMETER_GRADIENT]->clip(-threshold, threshold); } - optimizer_->update(vecs, config, sparseId); } diff --git a/paddle/parameter/OptimizerWithRegularizer.cpp b/paddle/parameter/OptimizerWithRegularizer.cpp index 85f13c8bc08c534224a1a8365d541737980b439f..7910b12444938a0555c211bb3dfd0f4209e480ec 100644 --- a/paddle/parameter/OptimizerWithRegularizer.cpp +++ b/paddle/parameter/OptimizerWithRegularizer.cpp @@ -131,7 +131,8 @@ ParameterOptimizer* OptimizerWithRegularizer::create( bool inPserver) { ParameterOptimizer* optimizer = ParameterOptimizer::create(optConfig, inPserver); - if (paraConfig.gradient_clipping_threshold() > 0.0f && + if ((optConfig.gradient_clipping_threshold() > 0.0f || + paraConfig.gradient_clipping_threshold() > 0.0f) && !dynamic_cast(optimizer)) { optimizer = new OptimizerWithGradientClipping(optConfig, optimizer); } diff --git a/paddle/parameter/ParameterOptimizer.h b/paddle/parameter/ParameterOptimizer.h index 2bdc793d605e01f8e055087bb3e0973168cb0213..f98ba569b569379b30d034739a7f84aaf97108db 100644 --- a/paddle/parameter/ParameterOptimizer.h +++ b/paddle/parameter/ParameterOptimizer.h @@ -167,6 +167,7 @@ public: } parameterTypes_.push_back(type); } + real getLearningRate() const { return learningRate_; } virtual void setNoDecay() { applyDecay_ = false; } @@ -201,6 +202,7 @@ protected: * so, if lr change in StartBatch, please assign to learningRate_ */ real learningRate_; + std::unique_ptr learningRateScheduler_; int64_t pass_; // current training pass (starting from 0) bool firstTime_; diff --git a/paddle/scripts/run_python_tests.sh b/paddle/scripts/run_python_tests.sh index 02d2cdb977473c1032b06ffca59544b3ba98d1fa..c588b9e08def5510e5daea2fd4e18f66c5f89bb8 100755 --- a/paddle/scripts/run_python_tests.sh +++ b/paddle/scripts/run_python_tests.sh @@ -29,7 +29,7 @@ if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then fi export PYTHONPATH=$SCRIPTPATH/../../python/ -$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3 +$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3 rarfile for fn in "$@" do diff --git a/proto/TrainerConfig.proto b/proto/TrainerConfig.proto index a334e07b6282a6ff9867482e0c3a299df2a78d1d..a819d20d11ff3932d331801007b8cfb9c77a3f2b 100644 --- a/proto/TrainerConfig.proto +++ b/proto/TrainerConfig.proto @@ -128,6 +128,9 @@ message OptimizationConfig { // when async_lagged_grad_discard_ratio * num_gradient_servers commit passed, // current async gradient will be discard silently. optional double async_lagged_grad_discard_ratio = 37 [default = 1.5]; + + // global threshold for gradient clipping + optional double gradient_clipping_threshold = 38 [default = 0.0]; }; message TrainerConfig { diff --git a/python/paddle/trainer/config_parser.py b/python/paddle/trainer/config_parser.py index 9135f38719a44e3070f42e478d0fc6b0004227b5..9fe8794691e5f742b3c290850d7f2f4db4862cf4 100644 --- a/python/paddle/trainer/config_parser.py +++ b/python/paddle/trainer/config_parser.py @@ -3377,6 +3377,7 @@ settings = dict( algorithm='async_sgd', async_lagged_grad_discard_ratio=1.5, learning_method='momentum', + gradient_clipping_threshold=None, num_batches_per_send_parameter=None, num_batches_per_get_parameter=None, center_parameter_update_method=None, diff --git a/python/paddle/trainer_config_helpers/optimizers.py b/python/paddle/trainer_config_helpers/optimizers.py index a53ebe160be3b5d6d115e3e15d059d3d87e80942..c3495ee110bfaf91a47637a52e88b3bb56dce7a9 100644 --- a/python/paddle/trainer_config_helpers/optimizers.py +++ b/python/paddle/trainer_config_helpers/optimizers.py @@ -408,7 +408,8 @@ def settings(batch_size, args = [ 'batch_size', 'learning_rate', 'learning_rate_decay_a', - 'learning_rate_decay_b', 'learning_rate_schedule', 'learning_rate_args' + 'learning_rate_decay_b', 'learning_rate_schedule', 'learning_rate_args', + 'gradient_clipping_threshold' ] kwargs = dict() kwargs['algorithm'] = algorithm diff --git a/python/paddle/v2/dataset/__init__.py b/python/paddle/v2/dataset/__init__.py index 80ff6295c34e853d8f69b9e78719af23a56d1fbb..26252d5bbd77ddb70b4f03843679e4737e2f96d3 100644 --- a/python/paddle/v2/dataset/__init__.py +++ b/python/paddle/v2/dataset/__init__.py @@ -24,8 +24,9 @@ import conll05 import uci_housing import sentiment import wmt14 +import mq2007 __all__ = [ 'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment' - 'uci_housing', 'wmt14' + 'uci_housing', 'wmt14', 'mq2007' ] diff --git a/python/paddle/v2/dataset/mq2007.py b/python/paddle/v2/dataset/mq2007.py new file mode 100644 index 0000000000000000000000000000000000000000..fd71b341662ca6f540ce44a86348e782561a97d7 --- /dev/null +++ b/python/paddle/v2/dataset/mq2007.py @@ -0,0 +1,337 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +MQ2007 dataset + +MQ2007 is a query set from Million Query track of TREC 2007. There are about 1700 queries in it with labeled documents. In MQ2007, the 5-fold cross +validation strategy is adopted and the 5-fold partitions are included in the package. In each fold, there are three subsets for learning: training set, +validation set and testing set. + +MQ2007 dataset from website +http://research.microsoft.com/en-us/um/beijing/projects/letor/LETOR4.0/Data/MQ2007.rar and parse training set and test set into paddle reader creators + +""" + +import os +import random +import functools +import rarfile +from common import download +import numpy as np + +# URL = "http://research.microsoft.com/en-us/um/beijing/projects/letor/LETOR4.0/Data/MQ2007.rar" +URL = "http://www.bigdatalab.ac.cn/benchmark/upload/download_source/7b6dbbe2-842c-11e4-a536-bcaec51b9163_MQ2007.rar" +MD5 = "7be1640ae95c6408dab0ae7207bdc706" + + +def __initialize_meta_info__(): + """ + download and extract the MQ2007 dataset + """ + fn = fetch() + rar = rarfile.RarFile(fn) + dirpath = os.path.dirname(fn) + rar.extractall(path=dirpath) + return dirpath + + +class Query(object): + """ + queries used for learning to rank algorithms. It is created from relevance scores, query-document feature vectors + + Parameters: + ---------- + query_id : int + query_id in dataset, mapping from query to relevance documents + relevance_score : int + relevance score of query and document pair + feature_vector : array, dense feature + feature in vector format + description : string + comment section in query doc pair data + """ + + def __init__(self, + query_id=-1, + relevance_score=-1, + feature_vector=None, + description=""): + self.query_id = query_id + self.relevance_score = relevance_score + if feature_vector is None: + self.feature_vector = [] + else: + self.feature_vector = feature_vector + self.description = description + + def __str__(self): + string = "%s %s %s" % (str(self.relevance_score), str(self.query_id), + " ".join(str(f) for f in self.feature_vector)) + return string + + # @classmethod + def _parse_(self, text): + """ + parse line into Query + """ + comment_position = text.find('#') + line = text[:comment_position].strip() + self.description = text[comment_position + 1:].strip() + parts = line.split() + if len(parts) != 48: + sys.stdout.write("expect 48 space split parts, get %d" % + (len(parts))) + return None + # format : 0 qid:10 1:0.000272 2:0.000000 .... + self.relevance_score = int(parts[0]) + self.query_id = int(parts[1].split(':')[1]) + for p in parts[2:]: + pair = p.split(':') + self.feature_vector.append(float(pair[1])) + return self + + +class QueryList(object): + """ + group query into list, every item in list is a Query + """ + + def __init__(self, querylist=None): + self.query_id = -1 + if querylist is None: + self.querylist = [] + else: + self.querylist = querylist + for query in self.querylist: + if self.query_id == -1: + self.query_id = query.query_id + else: + if self.query_id != query.query_id: + raise ValueError("query in list must be same query_id") + + def __iter__(self): + for query in self.querylist: + yield query + + def __len__(self): + return len(self.querylist) + + def __getitem__(self, i): + return self.querylist[i] + + def _correct_ranking_(self): + if self.querylist is None: + return + self.querylist.sort(key=lambda x: x.relevance_score, reverse=True) + + def _add_query(self, query): + if self.query_id == -1: + self.query_id = query.query_id + else: + if self.query_id != query.query_id: + raise ValueError("query in list must be same query_id") + self.querylist.append(query) + + +def gen_plain_txt(querylist): + """ + gen plain text in list for other usage + Paramters: + -------- + querylist : querylist, one query match many docment pairs in list, see QueryList + + return : + ------ + query_id : np.array, shape=(samples_num, ) + label : np.array, shape=(samples_num, ) + querylist : np.array, shape=(samples_num, feature_dimension) + """ + if not isinstance(querylist, QueryList): + querylist = QueryList(querylist) + querylist._correct_ranking_() + for query in querylist: + yield querylist.query_id, query.relevance_score, np.array( + query.feature_vector) + + +def gen_point(querylist): + """ + gen item in list for point-wise learning to rank algorithm + Paramters: + -------- + querylist : querylist, one query match many docment pairs in list, see QueryList + + return : + ------ + label : np.array, shape=(samples_num, ) + querylist : np.array, shape=(samples_num, feature_dimension) + """ + if not isinstance(querylist, QueryList): + querylist = QueryList(querylist) + querylist._correct_ranking_() + for query in querylist: + yield query.relevance_score, np.array(query.feature_vector) + + +def gen_pair(querylist, partial_order="full"): + """ + gen pair for pair-wise learning to rank algorithm + Paramters: + -------- + querylist : querylist, one query match many docment pairs in list, see QueryList + pairtial_order : "full" or "neighbour" + there is redudant in all possiable pair combinations, which can be simplifed + gen pairs for neighbour items or the full partial order pairs + + return : + ------ + label : np.array, shape=(1) + query_left : np.array, shape=(1, feature_dimension) + query_right : same as left + """ + if not isinstance(querylist, QueryList): + querylist = QueryList(querylist) + querylist._correct_ranking_() + labels = [] + docpairs = [] + + # C(n,2) + for i in range(len(querylist)): + query_left = querylist[i] + for j in range(i + 1, len(querylist)): + query_right = querylist[j] + if query_left.relevance_score > query_right.relevance_score: + labels.append(1) + docpairs.append([ + np.array(query_left.feature_vector), + np.array(query_right.feature_vector) + ]) + elif query_left.relevance_score < query_right.relevance_score: + labels.append(1) + docpairs.append([ + np.array(query_right.feature_vector), + np.array(query_left.feature_vector) + ]) + for label, pair in zip(labels, docpairs): + yield label, pair[0], pair[1] + + +def gen_list(querylist): + """ + gen item in list for list-wise learning to rank algorithm + Paramters: + -------- + querylist : querylist, one query match many docment pairs in list, see QueryList + + return : + ------ + label : np.array, shape=(samples_num, ) + querylist : np.array, shape=(samples_num, feature_dimension) + """ + if not isinstance(querylist, QueryList): + querylist = QueryList(querylist) + querylist._correct_ranking_() + relevance_score_list = [query.relevance_score for query in querylist] + feature_vector_list = [query.feature_vector for query in querylist] + yield np.array(relevance_score_list).T, np.array(feature_vector_list) + + +def query_filter(querylists): + """ + filter query get only document with label 0. + label 0, 1, 2 means the relevance score document with query + parameters : + querylist : QueyList list + + return : + querylist : QueyList list + """ + filter_query = [] + for querylist in querylists: + relevance_score_list = [query.relevance_score for query in querylist] + if sum(relevance_score_list) != .0: + filter_query.append(querylist) + return filter_query + + +def load_from_text(filepath, shuffle=True, fill_missing=-1): + """ + parse data file into querys + """ + prev_query_id = -1 + querylists = [] + querylist = None + fn = __initialize_meta_info__() + with open(os.path.join(fn, filepath)) as f: + for line in f: + query = Query() + query = query._parse_(line) + if query == None: + continue + if query.query_id != prev_query_id: + if querylist is not None: + querylists.append(querylist) + querylist = QueryList() + prev_query_id = query.query_id + querylist._add_query(query) + if querylist is not None: + querylists.append(querylist) + if shuffle == True: + random.shuffle(querylists) + return querylists + + +def __reader__(filepath, format="pairwise", shuffle=True, fill_missing=-1): + """ + Parameters + -------- + filename : string + shuffle : shuffle query-doc pair under the same query + fill_missing : fill the missing value. default in MQ2007 is -1 + + Returns + ------ + yield + label query_left, query_right # format = "pairwise" + label querylist # format = "listwise" + """ + querylists = query_filter( + load_from_text( + filepath, shuffle=shuffle, fill_missing=fill_missing)) + for querylist in querylists: + if format == "plain_txt": + yield next(gen_plain_txt(querylist)) + elif format == "pointwise": + yield next(gen_point(querylist)) + elif format == "pairwise": + for pair in gen_pair(querylist): + yield pair + elif format == "listwise": + yield next(gen_list(querylist)) + + +train = functools.partial(__reader__, filepath="MQ2007/MQ2007/Fold1/train.txt") +test = functools.partial(__reader__, filepath="MQ2007/MQ2007/Fold1/test.txt") + + +def fetch(): + return download(URL, "MQ2007", MD5) + + +if __name__ == "__main__": + fetch() + mytest = functools.partial( + __reader__, filepath="MQ2007/MQ2007/Fold1/sample", format="listwise") + for label, query in mytest(): + print label, query diff --git a/python/paddle/v2/dataset/tests/mq2007_test.py b/python/paddle/v2/dataset/tests/mq2007_test.py new file mode 100644 index 0000000000000000000000000000000000000000..59847b6c18eadb12123cae824e8bce1051a69d4c --- /dev/null +++ b/python/paddle/v2/dataset/tests/mq2007_test.py @@ -0,0 +1,33 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.v2.dataset.mq2007 +import unittest + + +class TestMQ2007(unittest.TestCase): + def test_pairwise(self): + for label, query_left, query_right in paddle.v2.dataset.mq2007.test( + format="pairwise"): + self.assertEqual(query_left.shape(), (46, )) + self.assertEqual(query_right.shape(), (46, )) + + def test_listwise(self): + for label_array, query_array in paddle.v2.dataset.mq2007.test( + format="listwise"): + self.assertEqual(len(label_array), len(query_array)) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index 5dfb46192ae54fdc36b0867312cf156aefb84f84..1afaffd261702738cdd552636a26dfcfbfe5ac48 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -1,5 +1,6 @@ from setuptools import setup + packages=['paddle', 'paddle.proto', 'paddle.trainer', @@ -18,6 +19,7 @@ setup(name='paddle', "numpy", "protobuf==${PROTOBUF_VERSION}", "matplotlib", + "rarfile" ], packages=packages, package_dir={