提交 28e98072 编写于 作者: L Luo Tao

Merge branch 'develop' into rename

...@@ -49,6 +49,7 @@ before_install: ...@@ -49,6 +49,7 @@ before_install:
# Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python # Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python
# protobuf version. # protobuf version.
- pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
- pip install rarfile
- | - |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script: script:
......
...@@ -56,7 +56,8 @@ RUN pip install --upgrade pip && \ ...@@ -56,7 +56,8 @@ RUN pip install --upgrade pip && \
pip install -U docopt PyYAML sphinx && \ pip install -U docopt PyYAML sphinx && \
pip install -U sphinx-rtd-theme==0.1.9 recommonmark && \ pip install -U sphinx-rtd-theme==0.1.9 recommonmark && \
pip install pre-commit 'requests==2.9.2' 'ipython==5.3.0' && \ pip install pre-commit 'requests==2.9.2' 'ipython==5.3.0' && \
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip install rarfile
# To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use # To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use
# the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2 # the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2
......
...@@ -21,9 +21,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): ...@@ -21,9 +21,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False):
size=word_vector_dim, size=word_vector_dim,
param_attr=paddle.attr.ParamAttr(name='_source_language_embedding')) param_attr=paddle.attr.ParamAttr(name='_source_language_embedding'))
src_forward = paddle.networks.simple_gru( src_forward = paddle.networks.simple_gru(
input=src_embedding, size=encoder_size) name='src_forward_gru', input=src_embedding, size=encoder_size)
src_backward = paddle.networks.simple_gru( src_backward = paddle.networks.simple_gru(
input=src_embedding, size=encoder_size, reverse=True) name='src_backward_gru',
input=src_embedding,
size=encoder_size,
reverse=True)
encoded_vector = paddle.layer.concat(input=[src_forward, src_backward]) encoded_vector = paddle.layer.concat(input=[src_forward, src_backward])
#### Decoder #### Decoder
...@@ -34,7 +37,9 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): ...@@ -34,7 +37,9 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False):
backward_first = paddle.layer.first_seq(input=src_backward) backward_first = paddle.layer.first_seq(input=src_backward)
with paddle.layer.mixed( with paddle.layer.mixed(
size=decoder_size, act=paddle.activation.Tanh()) as decoder_boot: name="decoder_boot_mixed",
size=decoder_size,
act=paddle.activation.Tanh()) as decoder_boot:
decoder_boot += paddle.layer.full_matrix_projection( decoder_boot += paddle.layer.full_matrix_projection(
input=backward_first) input=backward_first)
...@@ -44,11 +49,17 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): ...@@ -44,11 +49,17 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False):
name='gru_decoder', size=decoder_size, boot_layer=decoder_boot) name='gru_decoder', size=decoder_size, boot_layer=decoder_boot)
context = paddle.networks.simple_attention( context = paddle.networks.simple_attention(
name="simple_attention",
encoded_sequence=enc_vec, encoded_sequence=enc_vec,
encoded_proj=enc_proj, encoded_proj=enc_proj,
decoder_state=decoder_mem) decoder_state=decoder_mem)
with paddle.layer.mixed(size=decoder_size * 3) as decoder_inputs: with paddle.layer.mixed(
name="input_recurrent",
size=decoder_size * 3,
# enable error clipping
layer_attr=paddle.attr.ExtraAttr(
error_clipping_threshold=100.0)) as decoder_inputs:
decoder_inputs += paddle.layer.full_matrix_projection(input=context) decoder_inputs += paddle.layer.full_matrix_projection(input=context)
decoder_inputs += paddle.layer.full_matrix_projection( decoder_inputs += paddle.layer.full_matrix_projection(
input=current_word) input=current_word)
...@@ -57,9 +68,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): ...@@ -57,9 +68,12 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False):
name='gru_decoder', name='gru_decoder',
input=decoder_inputs, input=decoder_inputs,
output_mem=decoder_mem, output_mem=decoder_mem,
# uncomment to enable local threshold for gradient clipping
# param_attr=paddle.attr.ParamAttr(gradient_clipping_threshold=9.9),
size=decoder_size) size=decoder_size)
with paddle.layer.mixed( with paddle.layer.mixed(
name="gru_step_output",
size=target_dict_dim, size=target_dict_dim,
bias_attr=True, bias_attr=True,
act=paddle.activation.Softmax()) as out: act=paddle.activation.Softmax()) as out:
...@@ -125,7 +139,13 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False): ...@@ -125,7 +139,13 @@ def seqToseq_net(source_dict_dim, target_dict_dim, is_generating=False):
def main(): def main():
paddle.init(use_gpu=False, trainer_count=1) paddle.init(
use_gpu=False,
trainer_count=1,
# log gradient clipping info
log_clipping=True,
# log error clipping info
log_error_clipping=True)
is_generating = False is_generating = False
# source and target dict dim. # source and target dict dim.
...@@ -140,6 +160,8 @@ def main(): ...@@ -140,6 +160,8 @@ def main():
# define optimize method and trainer # define optimize method and trainer
optimizer = paddle.optimizer.Adam( optimizer = paddle.optimizer.Adam(
learning_rate=5e-5, learning_rate=5e-5,
# uncomment to enable global threshold for gradient clipping
# gradient_clipping_threshold=10.0,
regularization=paddle.optimizer.L2Regularization(rate=8e-4)) regularization=paddle.optimizer.L2Regularization(rate=8e-4))
trainer = paddle.trainer.SGD(cost=cost, trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters, parameters=parameters,
......
...@@ -2,8 +2,10 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) ...@@ -2,8 +2,10 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
go_library(adder SRCS adder.go) go_library(adder SRCS adder.go)
cc_test(cgo_test if (WITH_TESTING)
cc_test(cgo_test
SRCS SRCS
cgo_test.cc cgo_test.cc
DEPS DEPS
adder) adder)
endif()
...@@ -2,12 +2,9 @@ cmake_minimum_required(VERSION 3.0) ...@@ -2,12 +2,9 @@ cmake_minimum_required(VERSION 3.0)
if(GTEST_INCLUDE_DIR AND GTEST_LIBRARIES) if(GTEST_INCLUDE_DIR AND GTEST_LIBRARIES)
message("-- Found gtest (include: ${GTEST_INCLUDE_DIR}, library: ${GTEST_LIBRARIES})") message("-- Found gtest (include: ${GTEST_INCLUDE_DIR}, library: ${GTEST_LIBRARIES})")
else() else()
# find #include <majel/xx.h>
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
include_directories(${PARENT_DIR})
# find cmake directory modules # find cmake directory modules
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY)
......
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
include_directories(/env/gopath/src/github.com/PaddlePaddle/Paddle/paddle/go/cclient/build/) include_directories(${CMAKE_BINARY_DIR})
add_executable(main main.c) add_executable(main main.c)
add_dependencies(main client) add_dependencies(main client)
set (CMAKE_EXE_LINKER_FLAGS "-pthread") set (CMAKE_EXE_LINKER_FLAGS "-pthread")
target_link_libraries(main /env/gopath/src/github.com/PaddlePaddle/Paddle/paddle/go/cclient/build/libclient.a) # ${GTEST_LIBRARIES}) target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a)
# RecordIO
## Write
```go
f, e := os.Create("a_file.recordio")
w := recordio.NewWriter(f)
w.Write([]byte("Hello"))
w.Write([]byte("World!"))
w.Close()
```
## Read
1. Load chunk index:
```go
f, e := os.Open("a_file.recordio")
idx, e := recordio.LoadIndex(f)
fmt.Println("Total records: ", idx.Len())
```
2. Create one or more scanner to read a range of records. The
following example reads the range
[1, 3), i.e., the second and the third records:
```go
f, e := os.Open("a_file.recordio")
s := recrodio.NewScanner(f, idx, 1, 3)
for s.Scan() {
fmt.Println(string(s.Record()))
}
if s.Err() != nil && s.Err() != io.EOF {
log.Fatalf("Something wrong with scanning: %v", e)
}
```
package recordio
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"github.com/golang/snappy"
)
// A Chunk contains the Header and optionally compressed records. To
// create a chunk, just use ch := &Chunk{}.
type Chunk struct {
records [][]byte
numBytes int // sum of record lengths.
}
func (ch *Chunk) add(record []byte) {
ch.records = append(ch.records, record)
ch.numBytes += len(record)
}
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
func (ch *Chunk) dump(w io.Writer, compressorIndex int) error {
// NOTE: don't check ch.numBytes instead, because empty
// records are allowed.
if len(ch.records) == 0 {
return nil
}
// Write raw records and their lengths into data buffer.
var data bytes.Buffer
for _, r := range ch.records {
var rs [4]byte
binary.LittleEndian.PutUint32(rs[:], uint32(len(r)))
if _, e := data.Write(rs[:]); e != nil {
return fmt.Errorf("Failed to write record length: %v", e)
}
if _, e := data.Write(r); e != nil {
return fmt.Errorf("Failed to write record: %v", e)
}
}
compressed, e := compressData(&data, compressorIndex)
if e != nil {
return e
}
// Write chunk header and compressed data.
hdr := &Header{
checkSum: crc32.ChecksumIEEE(compressed.Bytes()),
compressor: uint32(compressorIndex),
compressedSize: uint32(compressed.Len()),
numRecords: uint32(len(ch.records)),
}
if _, e := hdr.write(w); e != nil {
return fmt.Errorf("Failed to write chunk header: %v", e)
}
if _, e := w.Write(compressed.Bytes()); e != nil {
return fmt.Errorf("Failed to write chunk data: %v", e)
}
// Clear the current chunk.
ch.records = nil
ch.numBytes = 0
return nil
}
type noopCompressor struct {
*bytes.Buffer
}
func (c *noopCompressor) Close() error {
return nil
}
func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
compressed := new(bytes.Buffer)
var compressor io.WriteCloser
switch compressorIndex {
case NoCompression:
compressor = &noopCompressor{compressed}
case Snappy:
compressor = snappy.NewBufferedWriter(compressed)
case Gzip:
compressor = gzip.NewWriter(compressed)
default:
return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
}
if _, e := io.Copy(compressor, src); e != nil {
return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
}
compressor.Close()
return compressed, nil
}
// parse the specified chunk from r.
func parseChunk(r io.ReadSeeker, chunkOffset int64) (*Chunk, error) {
var e error
var hdr *Header
if _, e = r.Seek(chunkOffset, io.SeekStart); e != nil {
return nil, fmt.Errorf("Failed to seek chunk: %v", e)
}
hdr, e = parseHeader(r)
if e != nil {
return nil, fmt.Errorf("Failed to parse chunk header: %v", e)
}
var buf bytes.Buffer
if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil {
return nil, fmt.Errorf("Failed to read chunk data: %v", e)
}
if hdr.checkSum != crc32.ChecksumIEEE(buf.Bytes()) {
return nil, fmt.Errorf("Checksum checking failed.")
}
deflated, e := deflateData(&buf, int(hdr.compressor))
if e != nil {
return nil, e
}
ch := &Chunk{}
for i := 0; i < int(hdr.numRecords); i++ {
var rs [4]byte
if _, e = deflated.Read(rs[:]); e != nil {
return nil, fmt.Errorf("Failed to read record length: %v", e)
}
r := make([]byte, binary.LittleEndian.Uint32(rs[:]))
if _, e = deflated.Read(r); e != nil {
return nil, fmt.Errorf("Failed to read a record: %v", e)
}
ch.records = append(ch.records, r)
ch.numBytes += len(r)
}
return ch, nil
}
func deflateData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
var e error
var deflator io.Reader
switch compressorIndex {
case NoCompression:
deflator = src
case Snappy:
deflator = snappy.NewReader(src)
case Gzip:
deflator, e = gzip.NewReader(src)
if e != nil {
return nil, fmt.Errorf("Failed to create gzip reader: %v", e)
}
default:
return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
}
deflated := new(bytes.Buffer)
if _, e = io.Copy(deflated, deflator); e != nil {
return nil, fmt.Errorf("Failed to deflate chunk data: %v", e)
}
return deflated, nil
}
package recordio
import (
"encoding/binary"
"fmt"
"io"
)
const (
// NoCompression means writing raw chunk data into files.
// With other choices, chunks are compressed before written.
NoCompression = iota
// Snappy had been the default compressing algorithm widely
// used in Google. It compromises between speech and
// compression ratio.
Snappy
// Gzip is a well-known compression algorithm. It is
// recommmended only you are looking for compression ratio.
Gzip
magicNumber uint32 = 0x01020304
defaultCompressor = Snappy
)
// Header is the metadata of Chunk.
type Header struct {
checkSum uint32
compressor uint32
compressedSize uint32
numRecords uint32
}
func (c *Header) write(w io.Writer) (int, error) {
var buf [20]byte
binary.LittleEndian.PutUint32(buf[0:4], magicNumber)
binary.LittleEndian.PutUint32(buf[4:8], c.checkSum)
binary.LittleEndian.PutUint32(buf[8:12], c.compressor)
binary.LittleEndian.PutUint32(buf[12:16], c.compressedSize)
binary.LittleEndian.PutUint32(buf[16:20], c.numRecords)
return w.Write(buf[:])
}
func parseHeader(r io.Reader) (*Header, error) {
var buf [20]byte
if _, e := r.Read(buf[:]); e != nil {
return nil, e
}
if v := binary.LittleEndian.Uint32(buf[0:4]); v != magicNumber {
return nil, fmt.Errorf("Failed to parse magic number")
}
return &Header{
checkSum: binary.LittleEndian.Uint32(buf[4:8]),
compressor: binary.LittleEndian.Uint32(buf[8:12]),
compressedSize: binary.LittleEndian.Uint32(buf[12:16]),
numRecords: binary.LittleEndian.Uint32(buf[16:20]),
}, nil
}
package recordio
import "io"
// Index consists offsets and sizes of the consequetive chunks in a RecordIO file.
type Index struct {
chunkOffsets []int64
chunkLens []uint32
numRecords int // the number of all records in a file.
chunkRecords []int // the number of records in chunks.
}
// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
func LoadIndex(r io.ReadSeeker) (*Index, error) {
f := &Index{}
offset := int64(0)
var e error
var hdr *Header
for {
hdr, e = parseHeader(r)
if e != nil {
break
}
f.chunkOffsets = append(f.chunkOffsets, offset)
f.chunkLens = append(f.chunkLens, hdr.numRecords)
f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords))
f.numRecords += int(hdr.numRecords)
offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent)
if e != nil {
break
}
}
if e == io.EOF {
return f, nil
}
return nil, e
}
// NumRecords returns the total number of records in a RecordIO file.
func (r *Index) NumRecords() int {
return r.numRecords
}
// NumChunks returns the total number of chunks in a RecordIO file.
func (r *Index) NumChunks() int {
return len(r.chunkLens)
}
// ChunkIndex return the Index of i-th Chunk.
func (r *Index) ChunkIndex(i int) *Index {
idx := &Index{}
idx.chunkOffsets = []int64{r.chunkOffsets[i]}
idx.chunkLens = []uint32{r.chunkLens[i]}
idx.chunkRecords = []int{r.chunkRecords[i]}
idx.numRecords = idx.chunkRecords[0]
return idx
}
// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk. It returns (-1, -1) if the
// record is out of range.
func (r *Index) Locate(recordIndex int) (int, int) {
sum := 0
for i, l := range r.chunkLens {
sum += int(l)
if recordIndex < sum {
return i, recordIndex - sum + int(l)
}
}
return -1, -1
}
// Scanner scans records in a specified range within [0, numRecords).
type Scanner struct {
reader io.ReadSeeker
index *Index
start, end, cur int
chunkIndex int
chunk *Chunk
err error
}
// NewScanner creates a scanner that sequencially reads records in the
// range [start, start+len). If start < 0, it scans from the
// beginning. If len < 0, it scans till the end of file.
func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner {
if start < 0 {
start = 0
}
if len < 0 || start+len >= index.NumRecords() {
len = index.NumRecords() - start
}
return &Scanner{
reader: r,
index: index,
start: start,
end: start + len,
cur: start - 1, // The intial status required by Scan.
chunkIndex: -1,
chunk: &Chunk{},
}
}
// Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet.
func (s *Scanner) Scan() bool {
s.cur++
if s.cur >= s.end {
s.err = io.EOF
} else {
if ci, _ := s.index.Locate(s.cur); s.chunkIndex != ci {
s.chunkIndex = ci
s.chunk, s.err = parseChunk(s.reader, s.index.chunkOffsets[ci])
}
}
return s.err == nil
}
// Record returns the record under the current cursor.
func (s *Scanner) Record() []byte {
_, ri := s.index.Locate(s.cur)
return s.chunk.records[ri]
}
// Error returns the error that stopped Scan.
func (s *Scanner) Error() error {
return s.err
}
package recordio
import (
"bytes"
"testing"
"unsafe"
"github.com/stretchr/testify/assert"
)
func TestChunkHead(t *testing.T) {
assert := assert.New(t)
c := &Header{
checkSum: 123,
compressor: 456,
compressedSize: 789,
}
var buf bytes.Buffer
_, e := c.write(&buf)
assert.Nil(e)
cc, e := parseHeader(&buf)
assert.Nil(e)
assert.Equal(c, cc)
}
func TestWriteAndRead(t *testing.T) {
assert := assert.New(t)
data := []string{
"12345",
"1234",
"12"}
var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
n, e := w.Write([]byte(data[0])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(5, n)
n, e = w.Write([]byte(data[1])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(4, n)
n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk.
assert.Nil(e)
assert.Equal(n, 2)
assert.Nil(w.Close()) // flush the second chunk.
assert.Nil(w.Writer)
n, e = w.Write([]byte("anything")) // not effective after close.
assert.NotNil(e)
assert.Equal(n, 0)
idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal([]uint32{2, 1}, idx.chunkLens)
assert.Equal(
[]int64{0,
int64(4 + // magic number
unsafe.Sizeof(Header{}) +
5 + // first record
4 + // second record
2*4)}, // two record legnths
idx.chunkOffsets)
s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
assert.Equal(data[i], string(s.Record()))
i++
}
}
func TestWriteEmptyFile(t *testing.T) {
assert := assert.New(t)
var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
assert.Nil(w.Close())
assert.Equal(0, buf.Len())
idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal(0, idx.NumRecords())
}
package recordio_test
import (
"bytes"
"reflect"
"testing"
"github.com/PaddlePaddle/Paddle/paddle/go/recordio"
)
func TestWriteRead(t *testing.T) {
const total = 1000
var buf bytes.Buffer
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()
idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}
if idx.NumRecords() != total {
t.Fatal("num record does not match:", idx.NumRecords(), total)
}
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
i++
}
if i != total {
t.Fatal("total count not match:", i, total)
}
}
func TestChunkIndex(t *testing.T) {
const total = 1000
var buf bytes.Buffer
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()
idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}
if idx.NumChunks() != total {
t.Fatal("unexpected chunk num:", idx.NumChunks(), total)
}
for i := 0; i < total; i++ {
newIdx := idx.ChunkIndex(i)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1)
j := 0
for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
j++
}
if j != 1 {
t.Fatal("unexpected record per chunk:", j)
}
}
}
package recordio
import (
"fmt"
"io"
)
const (
defaultMaxChunkSize = 32 * 1024 * 1024
)
// Writer creates a RecordIO file.
type Writer struct {
io.Writer // Set to nil to mark a closed writer.
chunk *Chunk
maxChunkSize int // total records size, excluding metadata, before compression.
compressor int
}
// NewWriter creates a RecordIO file writer. Each chunk is compressed
// using the deflate algorithm given compression level. Note that
// level 0 means no compression and -1 means default compression.
func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer {
if maxChunkSize < 0 {
maxChunkSize = defaultMaxChunkSize
}
if compressor < 0 {
compressor = defaultCompressor
}
return &Writer{
Writer: w,
chunk: &Chunk{},
maxChunkSize: maxChunkSize,
compressor: compressor}
}
// Writes a record. It returns an error if Close has been called.
func (w *Writer) Write(record []byte) (int, error) {
if w.Writer == nil {
return 0, fmt.Errorf("Cannot write since writer had been closed")
}
if w.chunk.numBytes+len(record) > w.maxChunkSize {
if e := w.chunk.dump(w.Writer, w.compressor); e != nil {
return 0, e
}
}
w.chunk.add(record)
return len(record), nil
}
// Close flushes the current chunk and makes the writer invalid.
func (w *Writer) Close() error {
e := w.chunk.dump(w.Writer, w.compressor)
w.Writer = nil
return e
}
...@@ -161,6 +161,7 @@ void AdaDeltaParameterOptimizer::update(const VectorPtr vecs[], ...@@ -161,6 +161,7 @@ void AdaDeltaParameterOptimizer::update(const VectorPtr vecs[],
const ParameterConfig& config, const ParameterConfig& config,
size_t sparseId) const { size_t sparseId) const {
CHECK(sparseId == -1LU) << "Sparse update is not supported"; CHECK(sparseId == -1LU) << "Sparse update is not supported";
BaseMatrix& value = *vecs[PARAMETER_VALUE]; BaseMatrix& value = *vecs[PARAMETER_VALUE];
BaseMatrix& grad = *vecs[PARAMETER_GRADIENT]; BaseMatrix& grad = *vecs[PARAMETER_GRADIENT];
BaseMatrix& mom = *vecs[PARAMETER_MOMENTUM]; BaseMatrix& mom = *vecs[PARAMETER_MOMENTUM];
...@@ -265,6 +266,7 @@ void AdamParameterOptimizer::update(const VectorPtr vecs[], ...@@ -265,6 +266,7 @@ void AdamParameterOptimizer::update(const VectorPtr vecs[],
const ParameterConfig& config, const ParameterConfig& config,
size_t sparseId) const { size_t sparseId) const {
CHECK(sparseId == -1UL) << "Sparse update is not supported"; CHECK(sparseId == -1UL) << "Sparse update is not supported";
real beta1_power = std::pow(beta1_, step_); real beta1_power = std::pow(beta1_, step_);
real beta2_power = std::pow(beta2_, step_); real beta2_power = std::pow(beta2_, step_);
real learningRate = config.learning_rate() * learningRate_; real learningRate = config.learning_rate() * learningRate_;
...@@ -303,18 +305,25 @@ void AdamaxParameterOptimizer::update(const VectorPtr vecs[], ...@@ -303,18 +305,25 @@ void AdamaxParameterOptimizer::update(const VectorPtr vecs[],
void OptimizerWithGradientClipping::update(const VectorPtr vecs[], void OptimizerWithGradientClipping::update(const VectorPtr vecs[],
const ParameterConfig& config, const ParameterConfig& config,
size_t sparseId) const { size_t sparseId) const {
real globalThreshold = optConfig_.gradient_clipping_threshold();
real localThreshold = config.gradient_clipping_threshold();
// Use local gradient clipping threshold if it's enabled,
// otherwise using the global one.
real threshold = localThreshold > 0.0f ? localThreshold : globalThreshold;
std::string field = localThreshold > 0.0f ? "local" : "global";
real maxAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsMax(); real maxAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsMax();
if (maxAbsGrad > config.gradient_clipping_threshold()) { if (maxAbsGrad > threshold) {
if (FLAGS_log_clipping) { if (FLAGS_log_clipping) {
real avgAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsSum() / real avgAbsGrad = vecs[PARAMETER_GRADIENT]->getAbsSum() /
vecs[PARAMETER_GRADIENT]->getSize(); vecs[PARAMETER_GRADIENT]->getSize();
LOG(INFO) << "parameter=" << config.name() << " need clipping," LOG(INFO) << "parameter=" << config.name() << " need clipping by "
<< " max grad=" << maxAbsGrad << " avg grad=" << avgAbsGrad; << field << " threshold=" << threshold
<< ", max grad=" << maxAbsGrad << ", avg grad=" << avgAbsGrad;
} }
vecs[PARAMETER_GRADIENT]->clip(-config.gradient_clipping_threshold(), vecs[PARAMETER_GRADIENT]->clip(-threshold, threshold);
config.gradient_clipping_threshold());
} }
optimizer_->update(vecs, config, sparseId); optimizer_->update(vecs, config, sparseId);
} }
......
...@@ -131,7 +131,8 @@ ParameterOptimizer* OptimizerWithRegularizer::create( ...@@ -131,7 +131,8 @@ ParameterOptimizer* OptimizerWithRegularizer::create(
bool inPserver) { bool inPserver) {
ParameterOptimizer* optimizer = ParameterOptimizer* optimizer =
ParameterOptimizer::create(optConfig, inPserver); ParameterOptimizer::create(optConfig, inPserver);
if (paraConfig.gradient_clipping_threshold() > 0.0f && if ((optConfig.gradient_clipping_threshold() > 0.0f ||
paraConfig.gradient_clipping_threshold() > 0.0f) &&
!dynamic_cast<AddOptimizer*>(optimizer)) { !dynamic_cast<AddOptimizer*>(optimizer)) {
optimizer = new OptimizerWithGradientClipping(optConfig, optimizer); optimizer = new OptimizerWithGradientClipping(optConfig, optimizer);
} }
......
...@@ -167,6 +167,7 @@ public: ...@@ -167,6 +167,7 @@ public:
} }
parameterTypes_.push_back(type); parameterTypes_.push_back(type);
} }
real getLearningRate() const { return learningRate_; } real getLearningRate() const { return learningRate_; }
virtual void setNoDecay() { applyDecay_ = false; } virtual void setNoDecay() { applyDecay_ = false; }
...@@ -201,6 +202,7 @@ protected: ...@@ -201,6 +202,7 @@ protected:
* so, if lr change in StartBatch, please assign to learningRate_ * so, if lr change in StartBatch, please assign to learningRate_
*/ */
real learningRate_; real learningRate_;
std::unique_ptr<LearningRateScheduler> learningRateScheduler_; std::unique_ptr<LearningRateScheduler> learningRateScheduler_;
int64_t pass_; // current training pass (starting from 0) int64_t pass_; // current training pass (starting from 0)
bool firstTime_; bool firstTime_;
......
...@@ -29,7 +29,7 @@ if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then ...@@ -29,7 +29,7 @@ if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then
fi fi
export PYTHONPATH=$SCRIPTPATH/../../python/ export PYTHONPATH=$SCRIPTPATH/../../python/
$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3 $PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3 rarfile
for fn in "$@" for fn in "$@"
do do
......
...@@ -128,6 +128,9 @@ message OptimizationConfig { ...@@ -128,6 +128,9 @@ message OptimizationConfig {
// when async_lagged_grad_discard_ratio * num_gradient_servers commit passed, // when async_lagged_grad_discard_ratio * num_gradient_servers commit passed,
// current async gradient will be discard silently. // current async gradient will be discard silently.
optional double async_lagged_grad_discard_ratio = 37 [default = 1.5]; optional double async_lagged_grad_discard_ratio = 37 [default = 1.5];
// global threshold for gradient clipping
optional double gradient_clipping_threshold = 38 [default = 0.0];
}; };
message TrainerConfig { message TrainerConfig {
......
...@@ -3377,6 +3377,7 @@ settings = dict( ...@@ -3377,6 +3377,7 @@ settings = dict(
algorithm='async_sgd', algorithm='async_sgd',
async_lagged_grad_discard_ratio=1.5, async_lagged_grad_discard_ratio=1.5,
learning_method='momentum', learning_method='momentum',
gradient_clipping_threshold=None,
num_batches_per_send_parameter=None, num_batches_per_send_parameter=None,
num_batches_per_get_parameter=None, num_batches_per_get_parameter=None,
center_parameter_update_method=None, center_parameter_update_method=None,
......
...@@ -408,7 +408,8 @@ def settings(batch_size, ...@@ -408,7 +408,8 @@ def settings(batch_size,
args = [ args = [
'batch_size', 'learning_rate', 'learning_rate_decay_a', 'batch_size', 'learning_rate', 'learning_rate_decay_a',
'learning_rate_decay_b', 'learning_rate_schedule', 'learning_rate_args' 'learning_rate_decay_b', 'learning_rate_schedule', 'learning_rate_args',
'gradient_clipping_threshold'
] ]
kwargs = dict() kwargs = dict()
kwargs['algorithm'] = algorithm kwargs['algorithm'] = algorithm
......
...@@ -24,8 +24,9 @@ import conll05 ...@@ -24,8 +24,9 @@ import conll05
import uci_housing import uci_housing
import sentiment import sentiment
import wmt14 import wmt14
import mq2007
__all__ = [ __all__ = [
'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment' 'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment'
'uci_housing', 'wmt14' 'uci_housing', 'wmt14', 'mq2007'
] ]
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MQ2007 dataset
MQ2007 is a query set from Million Query track of TREC 2007. There are about 1700 queries in it with labeled documents. In MQ2007, the 5-fold cross
validation strategy is adopted and the 5-fold partitions are included in the package. In each fold, there are three subsets for learning: training set,
validation set and testing set.
MQ2007 dataset from website
http://research.microsoft.com/en-us/um/beijing/projects/letor/LETOR4.0/Data/MQ2007.rar and parse training set and test set into paddle reader creators
"""
import os
import random
import functools
import rarfile
from common import download
import numpy as np
# URL = "http://research.microsoft.com/en-us/um/beijing/projects/letor/LETOR4.0/Data/MQ2007.rar"
URL = "http://www.bigdatalab.ac.cn/benchmark/upload/download_source/7b6dbbe2-842c-11e4-a536-bcaec51b9163_MQ2007.rar"
MD5 = "7be1640ae95c6408dab0ae7207bdc706"
def __initialize_meta_info__():
"""
download and extract the MQ2007 dataset
"""
fn = fetch()
rar = rarfile.RarFile(fn)
dirpath = os.path.dirname(fn)
rar.extractall(path=dirpath)
return dirpath
class Query(object):
"""
queries used for learning to rank algorithms. It is created from relevance scores, query-document feature vectors
Parameters:
----------
query_id : int
query_id in dataset, mapping from query to relevance documents
relevance_score : int
relevance score of query and document pair
feature_vector : array, dense feature
feature in vector format
description : string
comment section in query doc pair data
"""
def __init__(self,
query_id=-1,
relevance_score=-1,
feature_vector=None,
description=""):
self.query_id = query_id
self.relevance_score = relevance_score
if feature_vector is None:
self.feature_vector = []
else:
self.feature_vector = feature_vector
self.description = description
def __str__(self):
string = "%s %s %s" % (str(self.relevance_score), str(self.query_id),
" ".join(str(f) for f in self.feature_vector))
return string
# @classmethod
def _parse_(self, text):
"""
parse line into Query
"""
comment_position = text.find('#')
line = text[:comment_position].strip()
self.description = text[comment_position + 1:].strip()
parts = line.split()
if len(parts) != 48:
sys.stdout.write("expect 48 space split parts, get %d" %
(len(parts)))
return None
# format : 0 qid:10 1:0.000272 2:0.000000 ....
self.relevance_score = int(parts[0])
self.query_id = int(parts[1].split(':')[1])
for p in parts[2:]:
pair = p.split(':')
self.feature_vector.append(float(pair[1]))
return self
class QueryList(object):
"""
group query into list, every item in list is a Query
"""
def __init__(self, querylist=None):
self.query_id = -1
if querylist is None:
self.querylist = []
else:
self.querylist = querylist
for query in self.querylist:
if self.query_id == -1:
self.query_id = query.query_id
else:
if self.query_id != query.query_id:
raise ValueError("query in list must be same query_id")
def __iter__(self):
for query in self.querylist:
yield query
def __len__(self):
return len(self.querylist)
def __getitem__(self, i):
return self.querylist[i]
def _correct_ranking_(self):
if self.querylist is None:
return
self.querylist.sort(key=lambda x: x.relevance_score, reverse=True)
def _add_query(self, query):
if self.query_id == -1:
self.query_id = query.query_id
else:
if self.query_id != query.query_id:
raise ValueError("query in list must be same query_id")
self.querylist.append(query)
def gen_plain_txt(querylist):
"""
gen plain text in list for other usage
Paramters:
--------
querylist : querylist, one query match many docment pairs in list, see QueryList
return :
------
query_id : np.array, shape=(samples_num, )
label : np.array, shape=(samples_num, )
querylist : np.array, shape=(samples_num, feature_dimension)
"""
if not isinstance(querylist, QueryList):
querylist = QueryList(querylist)
querylist._correct_ranking_()
for query in querylist:
yield querylist.query_id, query.relevance_score, np.array(
query.feature_vector)
def gen_point(querylist):
"""
gen item in list for point-wise learning to rank algorithm
Paramters:
--------
querylist : querylist, one query match many docment pairs in list, see QueryList
return :
------
label : np.array, shape=(samples_num, )
querylist : np.array, shape=(samples_num, feature_dimension)
"""
if not isinstance(querylist, QueryList):
querylist = QueryList(querylist)
querylist._correct_ranking_()
for query in querylist:
yield query.relevance_score, np.array(query.feature_vector)
def gen_pair(querylist, partial_order="full"):
"""
gen pair for pair-wise learning to rank algorithm
Paramters:
--------
querylist : querylist, one query match many docment pairs in list, see QueryList
pairtial_order : "full" or "neighbour"
there is redudant in all possiable pair combinations, which can be simplifed
gen pairs for neighbour items or the full partial order pairs
return :
------
label : np.array, shape=(1)
query_left : np.array, shape=(1, feature_dimension)
query_right : same as left
"""
if not isinstance(querylist, QueryList):
querylist = QueryList(querylist)
querylist._correct_ranking_()
labels = []
docpairs = []
# C(n,2)
for i in range(len(querylist)):
query_left = querylist[i]
for j in range(i + 1, len(querylist)):
query_right = querylist[j]
if query_left.relevance_score > query_right.relevance_score:
labels.append(1)
docpairs.append([
np.array(query_left.feature_vector),
np.array(query_right.feature_vector)
])
elif query_left.relevance_score < query_right.relevance_score:
labels.append(1)
docpairs.append([
np.array(query_right.feature_vector),
np.array(query_left.feature_vector)
])
for label, pair in zip(labels, docpairs):
yield label, pair[0], pair[1]
def gen_list(querylist):
"""
gen item in list for list-wise learning to rank algorithm
Paramters:
--------
querylist : querylist, one query match many docment pairs in list, see QueryList
return :
------
label : np.array, shape=(samples_num, )
querylist : np.array, shape=(samples_num, feature_dimension)
"""
if not isinstance(querylist, QueryList):
querylist = QueryList(querylist)
querylist._correct_ranking_()
relevance_score_list = [query.relevance_score for query in querylist]
feature_vector_list = [query.feature_vector for query in querylist]
yield np.array(relevance_score_list).T, np.array(feature_vector_list)
def query_filter(querylists):
"""
filter query get only document with label 0.
label 0, 1, 2 means the relevance score document with query
parameters :
querylist : QueyList list
return :
querylist : QueyList list
"""
filter_query = []
for querylist in querylists:
relevance_score_list = [query.relevance_score for query in querylist]
if sum(relevance_score_list) != .0:
filter_query.append(querylist)
return filter_query
def load_from_text(filepath, shuffle=True, fill_missing=-1):
"""
parse data file into querys
"""
prev_query_id = -1
querylists = []
querylist = None
fn = __initialize_meta_info__()
with open(os.path.join(fn, filepath)) as f:
for line in f:
query = Query()
query = query._parse_(line)
if query == None:
continue
if query.query_id != prev_query_id:
if querylist is not None:
querylists.append(querylist)
querylist = QueryList()
prev_query_id = query.query_id
querylist._add_query(query)
if querylist is not None:
querylists.append(querylist)
if shuffle == True:
random.shuffle(querylists)
return querylists
def __reader__(filepath, format="pairwise", shuffle=True, fill_missing=-1):
"""
Parameters
--------
filename : string
shuffle : shuffle query-doc pair under the same query
fill_missing : fill the missing value. default in MQ2007 is -1
Returns
------
yield
label query_left, query_right # format = "pairwise"
label querylist # format = "listwise"
"""
querylists = query_filter(
load_from_text(
filepath, shuffle=shuffle, fill_missing=fill_missing))
for querylist in querylists:
if format == "plain_txt":
yield next(gen_plain_txt(querylist))
elif format == "pointwise":
yield next(gen_point(querylist))
elif format == "pairwise":
for pair in gen_pair(querylist):
yield pair
elif format == "listwise":
yield next(gen_list(querylist))
train = functools.partial(__reader__, filepath="MQ2007/MQ2007/Fold1/train.txt")
test = functools.partial(__reader__, filepath="MQ2007/MQ2007/Fold1/test.txt")
def fetch():
return download(URL, "MQ2007", MD5)
if __name__ == "__main__":
fetch()
mytest = functools.partial(
__reader__, filepath="MQ2007/MQ2007/Fold1/sample", format="listwise")
for label, query in mytest():
print label, query
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.v2.dataset.mq2007
import unittest
class TestMQ2007(unittest.TestCase):
def test_pairwise(self):
for label, query_left, query_right in paddle.v2.dataset.mq2007.test(
format="pairwise"):
self.assertEqual(query_left.shape(), (46, ))
self.assertEqual(query_right.shape(), (46, ))
def test_listwise(self):
for label_array, query_array in paddle.v2.dataset.mq2007.test(
format="listwise"):
self.assertEqual(len(label_array), len(query_array))
if __name__ == "__main__":
unittest.main()
from setuptools import setup from setuptools import setup
packages=['paddle', packages=['paddle',
'paddle.proto', 'paddle.proto',
'paddle.trainer', 'paddle.trainer',
...@@ -18,6 +19,7 @@ setup(name='paddle', ...@@ -18,6 +19,7 @@ setup(name='paddle',
"numpy", "numpy",
"protobuf==${PROTOBUF_VERSION}", "protobuf==${PROTOBUF_VERSION}",
"matplotlib", "matplotlib",
"rarfile"
], ],
packages=packages, packages=packages,
package_dir={ package_dir={
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册