提交 adc5210b 编写于 作者: L Liu Yiqun

Merge branch 'support_scalar_kernels' of https://github.com/Xreki/Paddle into...

Merge branch 'support_scalar_kernels' of https://github.com/Xreki/Paddle into support_scalar_kernels
...@@ -16,3 +16,6 @@ third_party/ ...@@ -16,3 +16,6 @@ third_party/
*~ *~
bazel-* bazel-*
third_party/ third_party/
# clion workspace.
cmake-build-*
...@@ -18,3 +18,4 @@ ExternalProject_Add( ...@@ -18,3 +18,4 @@ ExternalProject_Add(
) )
add_definitions(-DANY_IMPL_ANY_CAST_MOVEABLE) add_definitions(-DANY_IMPL_ANY_CAST_MOVEABLE)
LIST(APPEND external_project_dependencies linb_any)
\ No newline at end of file
...@@ -14,6 +14,34 @@ ...@@ -14,6 +14,34 @@
INCLUDE(ExternalProject) INCLUDE(ExternalProject)
macro(PROMPT_PROTOBUF_LIB)
MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}")
MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}")
MESSAGE(STATUS "Protobuf version: ${PROTOBUF_VERSION}")
INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR})
RETURN()
endmacro()
macro(SET_PROTOBUF_VERSION)
EXEC_PROGRAM(${PROTOBUF_PROTOC_EXECUTABLE} ARGS --version OUTPUT_VARIABLE PROTOBUF_VERSION)
STRING(REGEX MATCH "[0-9]+.[0-9]+" PROTOBUF_VERSION "${PROTOBUF_VERSION}")
endmacro()
set(PROTOBUF_ROOT "" CACHE PATH "Folder contains protobuf")
if (NOT "${PROTOBUF_ROOT}" STREQUAL "")
find_path(PROTOBUF_INCLUDE_DIR google/protobuf/message.h PATHS ${PROTOBUF_ROOT}/include)
find_library(PROTOBUF_LIBRARY protobuf PATHS ${PROTOBUF_ROOT}/lib)
find_library(PROTOBUF_LITE_LIBRARY protobuf-lite PATHS ${PROTOBUF_ROOT}/lib)
find_library(PROTOBUF_PROTOC_LIBRARY protoc PATHS ${PROTOBUF_ROOT}/lib)
find_program(PROTOBUF_PROTOC_EXECUTABLE protoc PATHS ${PROTOBUF_ROOT}/bin)
if (PROTOBUF_INCLUDE_DIR AND PROTOBUF_LIBRARY AND PROTOBUF_LITE_LIBRARY AND PROTOBUF_PROTOC_LIBRARY AND PROTOBUF_PROTOC_EXECUTABLE)
message(STATUS "Using custom protobuf library in ${PROTOBUF_ROOT}.")
SET_PROTOBUF_VERSION()
PROMPT_PROTOBUF_LIB()
else()
message(WARNING "Cannot find protobuf library in ${PROTOBUF_ROOT}.")
endif()
endif()
FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST) FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST)
SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME}) SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME})
SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME}) SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME})
...@@ -78,8 +106,7 @@ IF(NOT CMAKE_CROSSCOMPILING) ...@@ -78,8 +106,7 @@ IF(NOT CMAKE_CROSSCOMPILING)
FIND_PACKAGE(Protobuf ${PROTOBUF_VERSION}) FIND_PACKAGE(Protobuf ${PROTOBUF_VERSION})
IF(PROTOBUF_FOUND) IF(PROTOBUF_FOUND)
EXEC_PROGRAM(${PROTOBUF_PROTOC_EXECUTABLE} ARGS --version OUTPUT_VARIABLE PROTOBUF_VERSION) SET_PROTOBUF_VERSION()
STRING(REGEX MATCH "[0-9]+.[0-9]+" PROTOBUF_VERSION "${PROTOBUF_VERSION}")
IF("${PROTOBUF_VERSION}" VERSION_LESS "3.1.0") IF("${PROTOBUF_VERSION}" VERSION_LESS "3.1.0")
SET(PROTOBUF_FOUND OFF) SET(PROTOBUF_FOUND OFF)
ENDIF() ENDIF()
...@@ -107,6 +134,4 @@ IF(NOT PROTOBUF_FOUND) ...@@ -107,6 +134,4 @@ IF(NOT PROTOBUF_FOUND)
SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE) SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE)
ENDIF(NOT PROTOBUF_FOUND) ENDIF(NOT PROTOBUF_FOUND)
MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}") PROMPT_PROTOBUF_LIB()
MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}") \ No newline at end of file
INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR})
...@@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c ...@@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c
## Task Queue ## Task Queue
As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress. As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *chunks* from one or multiple files. The master server maintains *task queues* to track the training progress.
### Task Queue Creation ### Task Queue Creation
...@@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da ...@@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da
func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error { func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error {
} }
``` ```
1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file. 1. The master server will scan through each RecordIO file to generate the *chunk index* and know how many chunks does each file have. A chunk can be referenced by the file path and the index of the chunk within the file. The chunk index is in memory data structure that enables fast access to each chunk, and the index of the chunk with the file is an integer start from 0, representing the n-th chunk within the file.
The definition of the block is: The definition of the chunk is:
```go ```go
type Block struct { type Chunk struct {
Idx int // index of the block within the file Idx int // index of the chunk within the file
Path string Path string
Index recordio.Index // block index Index recordio.Index // chunk index
} }
``` ```
1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. 1. Chunks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element.
The definition of the task is: The definition of the task is:
```go ```go
type Task struct { type Task struct {
Index int Index int
Blocks []Block Chunks []Chunk
} }
``` ```
......
...@@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function: ...@@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function:
```c ```c
int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto); int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto);
``` ```
The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will block until initialization is done, and return 0. As illustrated below: The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will return 0. `paddle_get_params` will be blocked until initialization is completed. As illustrated below:
<img src="./src/pserver_init.png"> <img src="./src/pserver_init.png">
...@@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client); ...@@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client);
* *
* paddle_begin_init_params will be called from multiple trainers, * paddle_begin_init_params will be called from multiple trainers,
* only one trainer will be selected to initialize the parameters on * only one trainer will be selected to initialize the parameters on
* parameter servers. Other trainers will be blocked until the * parameter servers. Other trainers need to get the initialized
* initialization is done, and they need to get the initialized
* parameters from parameter servers using @paddle_get_params. * parameters from parameter servers using @paddle_get_params.
* *
* @param pserver_config_proto serialized parameter server configuration in
* Protocol Buffers format.
* @return 1 if the trainer is selected to initialize parameter * @return 1 if the trainer is selected to initialize parameter
* servers, otherwise 0. * servers, otherwise 0.
*/ */
int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_config_proto); int paddle_begin_init_params(paddle_pserver_client* client);
/** /**
* @brief paddle_init_param initializes the parameter on parameter * @brief paddle_init_param initializes the parameter on parameter
...@@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_ ...@@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_
* *
* @param param the parameter to initialize. * @param param the parameter to initialize.
* @param param_config_proto the configuration for the parameter. * @param param_config_proto the configuration for the parameter.
* @param config_len the length of param_config_proto
* @return 0 if successful, otherwise -1. On failure, the trainer * @return 0 if successful, otherwise -1. On failure, the trainer
* needs to restart the entire initialization process (starting from * needs to restart the entire initialization process (starting from
* @paddle_begin_init_param). Or simply exit the program and wait for * @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer. * the cluster management system to restart the trainer.
*/ */
int paddle_init_param(paddle_pserver_client* client, paddle_parameter params, const char* param_config_proto); int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
/** /**
* @brief paddle_finish_init_params tells parameter servers client has * @brief paddle_finish_init_params tells parameter servers client has
......
# Design Doc: The C++ Class `Parameters`
`Parameters` is a concept we designed in Paddle V2 API. `Parameters` is a container of parameters, and make Paddle can shared parameter between topologies. We described usages of `Parameter` in [api.md](./api.md).
We used Python to implement Parameters when designing V2 API before. There are several defects for current implementation:
* We just use `memcpy` to share Parameters between topologies, but this is very inefficient.
* We did not implement share Parameters while training. We just trigger `memcpy` when start training.
It is necessary that we implement Parameters in CPP side. However, it could be a code refactoring for Paddle, because Paddle was designed for training only one topology before, i.e., each GradientMachine contains its Parameter as a data member. In current Paddle implementation, there are three concepts associated with `Parameters`:
1. `paddle::Parameter`. A `Parameters` is a container for `paddle::Parameter`.
It is evident that we should use `paddle::Parameter` when developing `Parameters`.
However, the `Parameter` class contains many functions and does not have a clear interface.
It contains `create/store Parameter`, `serialize/deserialize`, `optimize(i.e SGD)`, `randomize/zero`.
When we developing `Parameters`, we only use `create/store Parameter` functionality.
We should extract functionalities of Parameter into many classes to clean Paddle CPP implementation.
2. `paddle::GradientMachine` and its sub-classes, e.g., `paddle::MultiGradientMachine`, `paddle::NeuralNetwork`.
We should pass `Parameters` to `paddle::GradientMachine` when `forward/backward` to avoid `memcpy` between topologies.
Also, we should handle multi-GPU/CPU training, because `forward` and `backward` would perform on multi-GPUs and multi-CPUs.
`Parameters` should dispatch the parameter value to each device, and gather the parameter gradient from each device.
3. `paddle::ParameterUpdater`. The ParameterUpdater is used to update parameters in Paddle.
So `Parameters` should be used by `paddle::ParameterUpdater`, and `paddle::ParameterUpdater` should optimize `Parameters` (by SGD).
The step by step approach for implementation Parameters in Paddle C++ core is listed below. Each step should be a PR and could be merged into Paddle one by one.
1. Clean `paddle::Parameter` interface. Extract the functionalities of `paddle::Parameter` to prepare for the implementation of Parameters.
2. Implementation a `Parameters` class. It just stores the `paddle::Parameter` inside. Make `GradientMachine` uses `Parameters` as a class member.
3. Make `Parameters` support Multi-CPU and Multi-GPU training to prepare for sharing `Parameter` between topologies.
Because we need share `Parameters` between topologies, it is `Parameters`'s response to exchange Parameters between GPUs.
`GradientMachine` should not handle how to exchange Parameters because `GradientMachine` only used to train one topology and we need to support train many topologies in Paddle, i.e., there could be many GradientMachines use one `Parameters`.
* We should use a global function to exchange Parameters between GPUs, not a member function in `Parameters`. The `MultiGradientMachine` invoke this function, which uses `Parameters` as this function inputs.
* The MultiGradientMachine contains many functionalities. Extracting the Parameters exchanging logic could make MultiGradientMachine clearer and simpler.
4. Make `Parameters` as an argument for `forward/backward` function, not a data member for `GradientMachine`. For example, `forward` could be `forward(const Parameters& params, ...)` and `backward` could be `backward(Parameters* params, ...)`. After this step, Paddle could share `Parameters` between topologies.
5. `ParameterUpdater` is invoked by `GradientMachine` and `Trainer`, but it updates `Parameters`. In the end of this code refactoring, we could change `ParameterUpdater` directly uses `Parameters` to make `ParameterUpdater`'s implementation clear.
# DeepSpeech2 on PaddlePaddle: Design Doc
We are planning to build Deep Speech 2 (DS2) \[[1](#references)\], a powerful Automatic Speech Recognition (ASR) engine, on PaddlePaddle. For the first-stage plan, we have the following short-term goals:
- Release a basic distributed implementation of DS2 on PaddlePaddle.
- Contribute a chapter of Deep Speech to PaddlePaddle Book.
Intensive system optimization and low-latency inference library (details in \[[1](#references)\]) are not yet covered in this first-stage plan.
## Table of Contents
- [Tasks](#tasks)
- [Task Dependency](#task-dependency)
- [Design Details](#design-details)
- [Overview](#overview)
- [Row Convolution](#row-convolution)
- [Beam Search With CTC and LM](#beam-search-with-ctc-and-lm)
- [Future Work](#future-work)
- [References](#references)
## Tasks
We roughly break down the project into 14 tasks:
1. Develop an **audio data provider**:
- Json filelist generator.
- Audio file format transformer.
- Spectrogram feature extraction, power normalization etc.
- Batch data reader with SortaGrad.
- Data augmentation (optional).
- Prepare (one or more) public English data sets & baseline.
2. Create a **simplified DS2 model configuration**:
- With only fixed-length (by padding) audio sequences (otherwise need *Task 3*).
- With only bidirectional-GRU (otherwise need *Task 4*).
- With only greedy decoder (otherwise need *Task 5, 6*).
3. Develop to support **variable-shaped** dense-vector (image) batches of input data.
- Update `DenseScanner` in `dataprovider_converter.py`, etc.
4. Develop a new **lookahead-row-convolution layer** (See \[[1](#references)\] for details):
- Lookahead convolution windows.
- Within-row convolution, without kernels shared across rows.
5. Build KenLM **language model** (5-gram) for beam search decoder:
- Use KenLM toolkit.
- Prepare the corpus & train the model.
- Create infererence interfaces (for Task 6).
6. Develop a **beam search decoder** with CTC + LM + WORDCOUNT:
- Beam search with CTC.
- Beam search with external custom scorer (e.g. LM).
- Try to design a more general beam search interface.
7. Develop a **Word Error Rate evaluator**:
- update `ctc_error_evaluator`(CER) to support WER.
8. Prepare internal dataset for Mandarin (optional):
- Dataset, baseline, evaluation details.
- Particular data preprocessing for Mandarin.
- Might need cooperating with the Speech Department.
9. Create **standard DS2 model configuration**:
- With variable-length audio sequences (need *Task 3*).
- With unidirectional-GRU + row-convolution (need *Task 4*).
- With CTC-LM beam search decoder (need *Task 5, 6*).
10. Make it run perfectly on **clusters**.
11. Experiments and **benchmarking** (for accuracy, not efficiency):
- With public English dataset.
- With internal (Baidu) Mandarin dataset (optional).
12. Time **profiling** and optimization.
13. Prepare **docs**.
14. Prepare PaddlePaddle **Book** chapter with a simplified version.
## Task Dependency
Tasks parallelizable within phases:
Roadmap | Description | Parallelizable Tasks
----------- | :------------------------------------ | :--------------------
Phase I | Simplified model & components | *Task 1* ~ *Task 8*
Phase II | Standard model & benchmarking & profiling | *Task 9* ~ *Task 12*
Phase III | Documentations | *Task13* ~ *Task14*
Issue for each task will be created later. Contributions, discussions and comments are all highly appreciated and welcomed!
## Design Details
### Overview
Traditional **ASR** (Automatic Speech Recognition) pipelines require great human efforts devoted to elaborately tuning multiple hand-engineered components (e.g. audio feature design, accoustic model, pronuncation model and language model etc.). **Deep Speech 2** (**DS2**) \[[1](#references)\], however, trains such ASR models in an end-to-end manner, replacing most intermediate modules with only a single deep network architecture. With scaling up both the data and model sizes, DS2 achieves a very significant performance boost.
Please read Deep Speech 2 \[[1](#references),[2](#references)\] paper for more background knowledge.
The classical DS2 network contains 15 layers (from bottom to top):
- **Two** data layers (audio spectrogram, transcription text)
- **Three** 2D convolution layers
- **Seven** uni-directional simple-RNN layers
- **One** lookahead row convolution layers
- **One** fully-connected layers
- **One** CTC-loss layer
<div align="center">
<img src="image/ds2_network.png" width=350><br/>
Figure 1. Archetecture of Deep Speech 2 Network.
</div>
We don't have to persist on this 2-3-7-1-1-1 depth \[[2](#references)\]. Similar networks with different depths might also work well. As in \[[1](#references)\], authors use a different depth (e.g. 2-2-3-1-1-1) for final experiments.
Key ingredients about the layers:
- **Data Layers**:
- Frame sequences data of audio **spectrogram** (with FFT).
- Token sequences data of **transcription** text (labels).
- These two type of sequences do not have the same lengthes, thus a CTC-loss layer is required.
- **2D Convolution Layers**:
- Not only temporal convolution, but also **frequency convolution**. Like a 2D image convolution, but with a variable dimension (i.e. temporal dimension).
- With striding for only the first convlution layer.
- No pooling for all convolution layers.
- **Uni-directional RNNs**
- Uni-directional + row convolution: for low-latency inference.
- Bi-direcitional + without row convolution: if we don't care about the inference latency.
- **Row convolution**:
- For looking only a few steps ahead into the feature, instead of looking into a whole sequence in bi-directional RNNs.
- Not nessesary if with bi-direcitional RNNs.
- "**Row**" means convolutions are done within each frequency dimension (row), and no convolution kernels shared across.
- **Batch Normalization Layers**:
- Added to all above layers (except for data and loss layer).
- Sequence-wise normalization for RNNs: BatchNorm only performed on input-state projection and not state-state projection, for efficiency consideration.
Required Components | PaddlePaddle Support | Need to Develop
:------------------------------------- | :-------------------------------------- | :-----------------------
Data Layer I (Spectrogram) | Not supported yet. | TBD (Task 3)
Data Layer II (Transcription) | `paddle.data_type.integer_value_sequence` | -
2D Convolution Layer | `paddle.layer.image_conv_layer` | -
DataType Converter (vec2seq) | `paddle.layer.block_expand` | -
Bi-/Uni-directional RNNs | `paddle.layer.recurrent_group` | -
Row Convolution Layer | Not supported yet. | TBD (Task 4)
CTC-loss Layer | `paddle.layer.warp_ctc` | -
Batch Normalization Layer | `paddle.layer.batch_norm` | -
CTC-Beam search | Not supported yet. | TBD (Task 6)
### Row Convolution
TODO by Assignees
### Beam Search with CTC and LM
TODO by Assignees
## Future Work
- Efficiency Improvement
- Accuracy Improvement
- Low-latency Inference Library
- Large-scale benchmarking
## References
1. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](http://proceedings.mlr.press/v48/amodei16.pdf). ICML 2016.
2. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](https://arxiv.org/abs/1512.02595). arXiv:1512.02595.
package main
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/paddle/go/master"
"github.com/PaddlePaddle/Paddle/paddle/go/recordio"
)
func main() {
port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")
faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()
if *dataset == "" {
panic("no dataset specified.")
}
if *faultTolerance {
panic("fault tolernance not implemented.")
}
var chunks []master.Chunk
var paths []string
ss := strings.Split(*dataset, ",")
fmt.Println(ss)
for _, s := range ss {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
}
paths = append(paths, match...)
}
if len(paths) == 0 {
panic("no valid datset specified.")
}
idx := 0
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
panic(err)
}
index, err := recordio.LoadIndex(f)
if err != nil {
panic(err)
}
f.Close()
count := index.NumChunks()
for i := 0; i < count; i++ {
chunk := master.Chunk{
Idx: idx,
Path: path,
Index: *index.ChunkIndex(i),
}
chunks = append(chunks, chunk)
}
}
s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
if err != nil {
panic(err)
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
panic(err)
}
err = http.Serve(l, nil)
if err != nil {
panic(err)
}
}
package main package main
import ( import (
"flag"
"net" "net"
"net/http" "net/http"
"net/rpc" "net/rpc"
"strconv" "strconv"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/paddle/go/pserver" "github.com/PaddlePaddle/Paddle/paddle/go/pserver"
) )
func main() { func main() {
port := flag.Int("p", 0, "port of the pserver") port := flag.Int("port", 0, "port of the pserver")
flag.Parse() flag.Parse()
s := pserver.NewService() s := pserver.NewService()
......
package master
import (
"errors"
"log"
"sync"
"time"
"github.com/PaddlePaddle/Paddle/paddle/go/recordio"
)
const (
targetTaskCount = 300
)
// errors
var (
ErrNoMoreTask = errors.New("no more task for current pass")
ErrPendingTaskNotFound = errors.New("pending task not found")
)
// Service is the master server service.
type Service struct {
timeoutDur time.Duration
timeoutMax int
mu sync.Mutex
taskQueues taskQueues
}
// Recover recovers service state from etcd.
func Recover() (*Service, error) {
// TODO(helin): recover from snapshot state from etcd.
return nil, nil
}
func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
id := 0
if chunksPerTask <= 0 {
chunksPerTask = 1
}
var result []taskEntry
var cur taskEntry
for i, c := range chunks {
if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 {
cur.Task.ID = id
id++
result = append(result, cur)
cur.Task.Chunks = nil
}
cur.Task.Chunks = append(cur.Task.Chunks, c)
}
if len(cur.Task.Chunks) > 0 {
cur.Task.ID = id
id++
result = append(result, cur)
}
return result
}
// NewService creates a new service.
func NewService(chunks []Chunk, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service {
s := &Service{}
s.timeoutDur = timeoutDur
s.timeoutMax = timeoutMax
s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry)
s.taskQueues.Todo = partition(chunks, chunksPerTask)
return s
}
// Chunk is a chunk of data consisted of several data instances.
type Chunk struct {
Idx int // index of the chunk within the file
Path string
Index recordio.Index // block index
}
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
ID int
Chunks []Chunk
}
type taskEntry struct {
Epoch int
NumTimeout int
Task Task
}
type taskQueues struct {
Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry
Failed []Task
}
// *must* be called with s.mu being held.
func (s *Service) snapshot() error {
// TODO(helin): snapshot state on etcd.
return nil
}
// GetTask gets a new task from the service.
func (s *Service) GetTask(dummy int, task *Task) error {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.taskQueues.Todo) == 0 {
return ErrNoMoreTask
}
t := s.taskQueues.Todo[0]
t.Epoch++
s.taskQueues.Todo = s.taskQueues.Todo[1:]
s.taskQueues.Pending[t.Task.ID] = t
err := s.snapshot()
if err != nil {
return err
}
time.AfterFunc(s.timeoutDur, func(taskID int, epoch int) func() {
return func() {
s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID]
if !ok {
return
}
if t.Epoch != epoch {
// new epoch, task launched after the
// schedule of this timeout check.
return
}
defer func() {
err := s.snapshot()
if err != nil {
log.Println(err)
}
}()
delete(s.taskQueues.Pending, t.Task.ID)
t.NumTimeout++
if t.NumTimeout > s.timeoutMax {
s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task)
return
}
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
}
}(t.Task.ID, t.Epoch))
return nil
}
// TaskFinished tell the service that a task is finished.
func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID]
if !ok {
return ErrPendingTaskNotFound
}
// task finished, reset timeout
t.NumTimeout = 0
s.taskQueues.Done = append(s.taskQueues.Done, t)
delete(s.taskQueues.Pending, taskID)
return s.snapshot()
}
package master
import "testing"
func TestPartitionCount(t *testing.T) {
cs := make([]Chunk, 100)
ts := partition(cs, 5)
if len(ts) != 20 {
t.Error(len(ts))
}
cs = make([]Chunk, 101)
ts = partition(cs, 5)
if len(ts) != 21 {
t.Error(len(ts))
}
ts = partition(cs, 1)
if len(ts) != 101 {
t.Error(len(ts))
}
ts = partition(cs, 0)
if len(ts) != 101 {
t.Error(len(ts))
}
}
func TestPartionIndex(t *testing.T) {
cs := make([]Chunk, 100)
ts := partition(cs, 20)
for i := range ts {
if ts[i].Task.ID != i {
t.Error(ts[i], i)
}
}
}
...@@ -758,7 +758,7 @@ public: ...@@ -758,7 +758,7 @@ public:
T p3); // decayRate T p3); // decayRate
/// apply L1/L2 to *this* /// apply L1/L2 to *this*
void applyL1(T learningRate, T decayRate); virtual void applyL1(T learningRate, T decayRate);
void applyL1(BaseMatrixT& lr, T learningRate, T decayRate); void applyL1(BaseMatrixT& lr, T learningRate, T decayRate);
void applyL2(T learningRate, T decayRate); void applyL2(T learningRate, T decayRate);
void applyL2(BaseMatrixT& lr, T learningRate, T decayRate); void applyL2(BaseMatrixT& lr, T learningRate, T decayRate);
......
...@@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() { ...@@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() {
clearRows(); clearRows();
} }
void SparseRowCpuMatrix::applyL1Decay(real learningRate, real decayRate) { void SparseRowCpuMatrix::applyL1(real learningRate, real decayRate) {
apply([=](real* buf, size_t len) { apply([=](real* buf, size_t len) {
CpuVector value(0, nullptr); CpuVector value(0, nullptr);
value.subVecFrom(buf, 0, len); value.subVecFrom(buf, 0, len);
......
...@@ -94,7 +94,7 @@ public: ...@@ -94,7 +94,7 @@ public:
/** /**
* apply L1 to all sparse rows, should be apply after indices ready. * apply L1 to all sparse rows, should be apply after indices ready.
*/ */
void applyL1Decay(real learningRate, real decayRate); virtual void applyL1(real learningRate, real decayRate);
void clearIndices() { clearRows(); } void clearIndices() { clearRows(); }
void zeroMemThread(size_t tid, size_t numThreads); void zeroMemThread(size_t tid, size_t numThreads);
......
...@@ -20,6 +20,7 @@ limitations under the License. */ ...@@ -20,6 +20,7 @@ limitations under the License. */
#include "OptimizerFunctions.h" #include "OptimizerFunctions.h"
#include "OptimizerWithRegularizer.h" #include "OptimizerWithRegularizer.h"
#include "ParameterUpdateFunctions.h" #include "ParameterUpdateFunctions.h"
#include "ThreadLocalBuffer.h"
#include "hl_gpu.h" #include "hl_gpu.h"
#include "paddle/math/CpuSparseMatrix.h" #include "paddle/math/CpuSparseMatrix.h"
#include "paddle/math/MathUtils.h" #include "paddle/math/MathUtils.h"
...@@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) { ...@@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) {
} }
} }
SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() {
MatrixPtr mat = mats_[PARAMETER_VALUE];
if (mat) {
return dynamic_cast<SparsePrefetchRowCpuMatrix*>(mat.get());
}
return nullptr;
}
void Parameter::incUpdate(const UpdateCallback& callback) { void Parameter::incUpdate(const UpdateCallback& callback) {
// Static parameter is fixed, and does not need to be updated // Static parameter is fixed, and does not need to be updated
if (isStatic()) { if (isStatic()) {
...@@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) { ...@@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) {
return true; return true;
} }
ThreadLocal<std::vector<VectorPtr>> Parameter::tlsTempBufs_;
VectorPtr* Parameter::getTlsTempBufs() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
void Parameter::exec(ExecFunc func) {
auto execFunc = [this, func](int tid, size_t numThreads) {
if (numThreads == 1) { // single thread
func(this->getBufs());
} else { // multi thread
VectorPtr* vecs = Parameter::getTlsTempBufs();
auto interval = calcSplitArrayInterval(
this->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
for (size_t i = 0; i < (size_t)NUM_PARAMETER_TYPES; ++i) {
if (bufs_[i]) {
vecs[i]->subVecFrom(*bufs_[i], interval);
}
}
func(vecs);
}
};
getBuf(PARAMETER_VALUE)->exec(execFunc);
}
} // namespace paddle } // namespace paddle
...@@ -40,17 +40,6 @@ class Parameter; ...@@ -40,17 +40,6 @@ class Parameter;
typedef std::function<void(Parameter* param)> UpdateCallback; typedef std::function<void(Parameter* param)> UpdateCallback;
typedef std::function<void(int paramId, Parameter* param)> ParamInitCallback; typedef std::function<void(int paramId, Parameter* param)> ParamInitCallback;
struct Segment {
int64_t beginDim;
int64_t endDim;
// We allow the possibility that the parameters are not stored at contiguous
// memory locations for speed reason (i.e. data alignemnt)
// This means that the dimenstion is not same as the position in the memroy
// buffer.
int64_t beginPos; // beginning position in the local value or grad buffer
};
class Parameter; class Parameter;
typedef std::shared_ptr<Parameter> ParameterPtr; typedef std::shared_ptr<Parameter> ParameterPtr;
...@@ -167,13 +156,6 @@ public: ...@@ -167,13 +156,6 @@ public:
} }
} }
void enableSharedType(ParameterType type, VectorPtr vec, MatType matType) {
if (!bufs_[type]) {
bufs_[type] = vec;
setMat(type, matType);
}
}
/// for batchGradientMachine: blockNum is number of partitions of the matrix. /// for batchGradientMachine: blockNum is number of partitions of the matrix.
bool isGradShared(size_t* blockNum = NULL); bool isGradShared(size_t* blockNum = NULL);
...@@ -203,20 +185,6 @@ public: ...@@ -203,20 +185,6 @@ public:
const MatrixPtr& getMat(ParameterType pType) const { return mats_[pType]; } const MatrixPtr& getMat(ParameterType pType) const { return mats_[pType]; }
const IVectorPtr& getIntBuf(ParameterType pType) { return intBufs_[pType]; }
void setIntBuf(ParameterType pType, const IVectorPtr& iVec) {
intBufs_[pType] = iVec;
}
SparsePrefetchRowCpuMatrix* getPrefetchMatrix();
float getLearnRate() const { return config_.learning_rate(); }
float getInitMean() const { return config_.initial_mean(); }
float getInitStandardDeviation() const { return config_.initial_std(); }
void setValueUpdated() { updated_ = true; } void setValueUpdated() { updated_ = true; }
void clearValueUpdated() { updated_ = false; } void clearValueUpdated() { updated_ = false; }
...@@ -243,8 +211,6 @@ public: ...@@ -243,8 +211,6 @@ public:
*/ */
bool load(std::istream& is); bool load(std::istream& is);
std::vector<Segment>& getGradientSegments() { return gradSegments_; }
void incShared() { sharedCount_++; } void incShared() { sharedCount_++; }
/** /**
...@@ -351,35 +317,21 @@ protected: ...@@ -351,35 +317,21 @@ protected:
int sharedCount_; int sharedCount_;
int updateCounter_; int updateCounter_;
std::vector<Segment> gradSegments_; // segments of non-zero gradient
bool updated_; bool updated_;
SparseFormat format_; SparseFormat format_;
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
std::vector<std::shared_ptr<IParameterUpdaterHook>> updaterHooks_; std::vector<std::shared_ptr<IParameterUpdaterHook>> updaterHooks_;
public: public:
void setSharedCount(int cnt) { sharedCount_ = cnt; }
int getSharedCount() { return sharedCount_; } int getSharedCount() { return sharedCount_; }
void singleUpdate(void* data);
bool isSparse() { return config_.is_sparse(); } bool isSparse() { return config_.is_sparse(); }
SparseFormat getFormat() { return format_; } SparseFormat getFormat() { return format_; }
static const std::string kMissParameterFail; static const std::string kMissParameterFail;
static const std::string kMissParameterRand; static const std::string kMissParameterRand;
static const std::string kMissParameterZero; static const std::string kMissParameterZero;
static VectorPtr* getTlsTempBufs();
/**
* exec a func in single/multi thread.
* vecs is bufs_ of Parameter, as input of ExecFunc.
*/
typedef std::function<void(const VectorPtr vecs[])> ExecFunc;
void exec(ExecFunc func);
}; };
typedef std::map<std::string, ParameterPtr> ParameterMap; typedef std::map<std::string, ParameterPtr> ParameterMap;
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "ThreadLocalBuffer.h"
#include "Parameter.h"
namespace paddle {
namespace parameter {
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
VectorPtr* getThreadLocalBuffer() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
} // namespace parameter
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/math/Vector.h"
namespace paddle {
namespace parameter {
extern VectorPtr* getThreadLocalBuffer();
} // namespace parameter
} // namespace paddle
...@@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData( ...@@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData(
CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize; CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize;
const auto paraSize = parameter->getSize(); const auto paraSize = parameter->getSize();
if (sparseUpdate) { if (sparseUpdate) {
const auto prefetchMat = parameter->getPrefetchMatrix(); auto prefetchMat = std::dynamic_pointer_cast<SparsePrefetchRowCpuMatrix>(
parameter->getMat(PARAMETER_VALUE));
CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr"; CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr";
auto sendMat = dynamic_cast<SparseRowCpuMatrix*>( auto sendMat = dynamic_cast<SparseRowCpuMatrix*>(
parameter->getMat(parameterType).get()); parameter->getMat(parameterType).get());
......
...@@ -18,7 +18,6 @@ limitations under the License. */ ...@@ -18,7 +18,6 @@ limitations under the License. */
#include <fstream> #include <fstream>
#include "paddle/math/SIMDFunctions.h" #include "paddle/math/SIMDFunctions.h"
#include "paddle/parameter/AverageOptimizer.h" #include "paddle/parameter/AverageOptimizer.h"
#include "paddle/parameter/FirstOrderOptimizer.h" #include "paddle/parameter/FirstOrderOptimizer.h"
#include "paddle/parameter/OptimizerFunctions.h" #include "paddle/parameter/OptimizerFunctions.h"
...@@ -26,6 +25,7 @@ limitations under the License. */ ...@@ -26,6 +25,7 @@ limitations under the License. */
#include "paddle/parameter/ParameterOptimizer.h" #include "paddle/parameter/ParameterOptimizer.h"
#include "paddle/parameter/ParameterUpdateFunctions.h" #include "paddle/parameter/ParameterUpdateFunctions.h"
#include "paddle/parameter/Regularizer.h" #include "paddle/parameter/Regularizer.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Flags.h" #include "paddle/utils/Flags.h"
#include "paddle/utils/GlobalConstants.h" #include "paddle/utils/GlobalConstants.h"
#include "paddle/utils/Stat.h" #include "paddle/utils/Stat.h"
...@@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, ...@@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
bool commitGradient = asyncGrdientCommitCheckAndStat(request); bool commitGradient = asyncGrdientCommitCheckAndStat(request);
VectorPtr* vecs = Parameter::getTlsTempBufs(); VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t bufferIndex = 0; size_t bufferIndex = 0;
for (const auto& block : request.blocks()) { for (const auto& block : request.blocks()) {
int64_t offset = getBlockOffset(block); int64_t offset = getBlockOffset(block);
...@@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) { ...@@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) {
} }
void ParameterServer2::parallelExecForEachBlock(ExecFunc func) { void ParameterServer2::parallelExecForEachBlock(ExecFunc func) {
SyncThreadPool::execHelper(syncThreadPool_.get(), SyncThreadPool::execHelper(
[&](int tid, size_t numThreads) { syncThreadPool_.get(), [&](int tid, size_t numThreads) {
int64_t numBlocks = blockIdMap_.size(); int64_t numBlocks = blockIdMap_.size();
VectorPtr* vecs = Parameter::getTlsTempBufs(); VectorPtr* vecs = parameter::getThreadLocalBuffer();
for (int64_t blockId = tid; blockId < numBlocks; for (int64_t blockId = tid; blockId < numBlocks;
blockId += numThreads) { blockId += numThreads) {
func(blockId, vecs); func(blockId, vecs);
} }
}); });
} }
void ParameterServer2::blockTraverse( void ParameterServer2::blockTraverse(
......
...@@ -17,6 +17,7 @@ import collections ...@@ -17,6 +17,7 @@ import collections
import swig_paddle import swig_paddle
import numpy import numpy
import itertools import itertools
from functools import reduce
__all__ = ['DataProviderConverter'] __all__ = ['DataProviderConverter']
...@@ -65,6 +66,8 @@ class IScanner(object): ...@@ -65,6 +66,8 @@ class IScanner(object):
:param argument: Output arguments object. :param argument: Output arguments object.
:type argument: swig_paddle.Arguments :type argument: swig_paddle.Arguments
:param dat: Output arguments object.
:type dat: The Python object, numpy.array or List.
:return: :return:
""" """
pass pass
...@@ -95,17 +98,35 @@ class DenseScanner(IScanner): ...@@ -95,17 +98,35 @@ class DenseScanner(IScanner):
def __init__(self, input_type, pos): def __init__(self, input_type, pos):
IScanner.__init__(self, input_type, pos) IScanner.__init__(self, input_type, pos)
self.__mat__ = None self.__mat__ = None
self.__shape__ = None
self.__height__ = 0 self.__height__ = 0
self.__dim__ = 0
def pre_scan(self, dat): def pre_scan(self, dat):
self.__height__ += 1 self.__height__ += 1
if self.__shape__ is None:
self.__shape__ = numpy.array(dat).shape
if len(self.__shape__) > 3:
raise ValueError(
"The dimension of input cannot be greater than 3.")
self.__dim__ = reduce(lambda x, y: x * y, self.__shape__)
if len(self.__shape__) == 1 and self.__dim__ != self.input_type.dim:
raise ValueError(
"The data size must be equal to it in data layer.")
else:
if self.__shape__ != numpy.array(dat).shape:
raise ValueError(
"The data shape must be same in one mini-batch.")
def finish_pre_scan(self, argument): def finish_pre_scan(self, argument):
self.__mat__ = numpy.ndarray( self.__mat__ = numpy.ndarray(
shape=(self.__height__, self.input_type.dim), dtype=numpy.float32) shape=(self.__height__, self.__dim__), dtype=numpy.float32)
self.__height__ = 0 self.__height__ = 0
def scan(self, dat): def scan(self, dat):
# It's better to use NumPy array for speed.
dat = numpy.array(dat)
dat = dat.flatten()
self.__mat__[self.__height__] = dat self.__mat__[self.__height__] = dat
self.__height__ += 1 self.__height__ += 1
...@@ -116,6 +137,14 @@ class DenseScanner(IScanner): ...@@ -116,6 +137,14 @@ class DenseScanner(IScanner):
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True,
self.data_in_gpu) self.data_in_gpu)
argument.setSlotValue(self.pos, m) argument.setSlotValue(self.pos, m)
if len(self.__shape__) > 1:
# The last-two dimenstions are the frame height and width.
# For example, the layout is CHW for 3-D feature of image.
# The H and W are the fram height and width.
h, w = self.__shape__[-2:]
argument.setSlotFrameHeight(self.pos, h)
argument.setSlotFrameWidth(self.pos, w)
self.__shape__ = None
class SparseBinaryScanner(IScanner): class SparseBinaryScanner(IScanner):
......
...@@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize, ...@@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize,
bool apply) { bool apply) {
ParameterType sendBackParameterType = ParameterType sendBackParameterType =
(useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE; (useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE;
std::function<void()> getParams;
std::function<void(Parameter&, real)> applyL1;
if (fullSize) { if (fullSize) {
parameterClient_->getParameter( getParams = [&] {
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); parameterClient_->getParameter(
if (config_.shrink_parameter_value() > 0) { /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
for (auto& para : parameters_) { };
if (para->getConfig().decay_rate_l1() > 0) { applyL1 = [](Parameter& para, real decayRate) {
para->getBuf(PARAMETER_VALUE) para.getBuf(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
->applyL1(1.0f, // learningRate };
config_.shrink_parameter_value()); // decayRate
}
}
}
} else { } else {
REGISTER_TIMER("getParamSparse"); getParams = [&] {
parameterClient_->getParameterSparse( parameterClient_->getParameterSparse(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
};
applyL1 = [](Parameter& para, real decayRate) {
para.getMat(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
};
}
{
REGISTER_TIMER("getParamDenseAndSparse");
getParams();
if (config_.shrink_parameter_value() > 0) { if (config_.shrink_parameter_value() > 0) {
for (auto& para : parameters_) { for (auto& para : parameters_) {
if (para->getConfig().decay_rate_l1() > 0) { if (para->getConfig().decay_rate_l1() > 0) {
para->getPrefetchMatrix()->applyL1Decay( applyL1(*para, config_.shrink_parameter_value());
1.0f, // learningRate
config_.shrink_parameter_value()); // decayRate
} }
} }
} }
......
...@@ -17,6 +17,7 @@ limitations under the License. */ ...@@ -17,6 +17,7 @@ limitations under the License. */
#include "paddle/utils/Logging.h" #include "paddle/utils/Logging.h"
#include "paddle/math/SparseRowMatrix.h" #include "paddle/math/SparseRowMatrix.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Thread.h" #include "paddle/utils/Thread.h"
DECLARE_int32(trainer_count); DECLARE_int32(trainer_count);
...@@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse( ...@@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse(
int tid, int tid,
size_t numThreads, size_t numThreads,
Parameter* para) { Parameter* para) {
VectorPtr* vecs = Parameter::getTlsTempBufs(); VectorPtr* vecs = parameter::getThreadLocalBuffer();
if (para->isGradSparseUpdate()) { if (para->isGradSparseUpdate()) {
size_t height = para->getConfig().dims(0); size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1); size_t width = para->getConfig().dims(1);
...@@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid, ...@@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid,
Parameter* para) { Parameter* para) {
int pid = para->getID(); int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get(); ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs(); VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t height = para->getConfig().dims(0); size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1); size_t width = para->getConfig().dims(1);
...@@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid, ...@@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid,
Parameter* para) { Parameter* para) {
int pid = para->getID(); int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get(); ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs(); VectorPtr* vecs = parameter::getThreadLocalBuffer();
auto interval = calcSplitArrayInterval( auto interval = calcSplitArrayInterval(
para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/); para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
......
...@@ -55,8 +55,11 @@ public: ...@@ -55,8 +55,11 @@ public:
}; };
#else #else
// clang-format off
#include <cstddef>
#include <atomic> #include <atomic>
// clang-format on
class SpinLockPrivate { class SpinLockPrivate {
public: public:
inline void lock() { inline void lock() {
......
...@@ -24,15 +24,16 @@ add_custom_target(paddle_python ALL DEPENDS ...@@ -24,15 +24,16 @@ add_custom_target(paddle_python ALL DEPENDS
${OUTPUT_DIR}/.timestamp) ${OUTPUT_DIR}/.timestamp)
set(PADDLE_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/) set(PADDLE_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/)
add_subdirectory(paddle/trainer_config_helpers/tests)
if (WITH_SWIG_PY) if (WITH_TESTING)
# enable v2 API unittest only when paddle swig api is compiled add_subdirectory(paddle/trainer_config_helpers/tests)
add_subdirectory(paddle/v2/tests) if (WITH_SWIG_PY)
add_subdirectory(paddle/v2/reader/tests) # enable v2 API unittest only when paddle swig api is compiled
add_subdirectory(paddle/v2/plot/tests) add_subdirectory(paddle/v2/tests)
add_subdirectory(paddle/v2/reader/tests)
add_subdirectory(paddle/v2/plot/tests)
endif()
endif() endif()
install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR} install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR}
DESTINATION opt/paddle/share/wheels DESTINATION opt/paddle/share/wheels
) )
...@@ -72,9 +72,16 @@ class InputType(object): ...@@ -72,9 +72,16 @@ class InputType(object):
def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE): def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE):
""" """
Dense Vector. It means the input feature is dense float vector. For example, Dense Array. It means the input feature is dense array with float type.
if the input is an image with 28*28 pixels, the input of Paddle neural For example, if the input is an image with 28*28 pixels, the input of
network should be a dense vector with dimension 784. Paddle neural network could be a dense vector with dimension 784 or a
numpy array with shape (28, 28).
For the 2-D convolution operation, each sample in one mini-batch must have
the similarly size in PaddlePaddle now. But, it supports variable-dimension
feature across mini-batch. For the variable-dimension, the param dim is not
used. While the data reader must yield numpy array and the data feeder will
set the data shape correctly.
:param dim: dimension of this vector. :param dim: dimension of this vector.
:type dim: int :type dim: int
...@@ -135,6 +142,10 @@ sparse_binary_vector = sparse_non_value_slot ...@@ -135,6 +142,10 @@ sparse_binary_vector = sparse_non_value_slot
sparse_vector = sparse_value_slot sparse_vector = sparse_value_slot
integer_value = index_slot integer_value = index_slot
# dense_array can be used for variable-length input feature.
# Each feature is not a vector, but a multi-dimensional array.
dense_array = dense_slot
def dense_vector_sequence(dim): def dense_vector_sequence(dim):
""" """
......
...@@ -110,15 +110,16 @@ class ParameterAttribute(object): ...@@ -110,15 +110,16 @@ class ParameterAttribute(object):
momentum=None, momentum=None,
gradient_clipping_threshold=None, gradient_clipping_threshold=None,
sparse_update=False): sparse_update=False):
# initialize strategy. self.attr = {}
if is_static: if is_static:
self.attr = {'is_static': True} self.attr['is_static'] = True
elif initial_std is None and initial_mean is None and initial_max \
if initial_std is None and initial_mean is None and initial_max \
is None and initial_min is None: is None and initial_min is None:
self.attr = {'initial_smart': True} self.attr['initial_smart'] = True
elif is_compatible_with(initial_std, float) or \ elif is_compatible_with(initial_std, float) or \
is_compatible_with(initial_mean, float): is_compatible_with(initial_mean, float):
self.attr = dict()
if initial_std is not None: if initial_std is not None:
self.attr['initial_std'] = initial_std self.attr['initial_std'] = initial_std
if initial_mean is not None: if initial_mean is not None:
...@@ -131,7 +132,6 @@ class ParameterAttribute(object): ...@@ -131,7 +132,6 @@ class ParameterAttribute(object):
assert initial_min < initial_max assert initial_min < initial_max
initial_mean = (initial_max + initial_min) / 2 initial_mean = (initial_max + initial_min) / 2
initial_std = initial_mean - initial_min initial_std = initial_mean - initial_min
self.attr = dict()
self.attr['initial_mean'] = initial_mean self.attr['initial_mean'] = initial_mean
self.attr['initial_std'] = initial_std self.attr['initial_std'] = initial_std
self.attr['initial_strategy'] = 1 # Uniform Random self.attr['initial_strategy'] = 1 # Uniform Random
......
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.trainer.config_parser as config_parser
'''
This file is a wrapper of formal config_parser. The main idea of this file is to
separete different config logic into different function, such as network configuration
and optimizer configuration.
'''
__all__ = [
"parse_trainer_config", "parse_network_config", "parse_optimizer_config"
]
def parse_trainer_config(trainer_conf, config_arg_str):
return config_parser.parse_config(trainer_conf, config_arg_str)
def parse_network_config(network_conf):
config = config_parser.parse_config(network_conf, '')
return config.model_config
def parse_optimizer_config(optimizer_conf):
config = config_parser.parse_config(optimizer_conf, '')
return config.opt_config
...@@ -16,7 +16,8 @@ import paddle.trainer.PyDataProvider2 as pydp2 ...@@ -16,7 +16,8 @@ import paddle.trainer.PyDataProvider2 as pydp2
import_list = [ import_list = [
nm for nm in dir(pydp2) nm for nm in dir(pydp2)
if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm) if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm or
'array' in nm)
] ]
import_list.extend(['InputType']) import_list.extend(['InputType'])
......
...@@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__ ...@@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__
class RecurrentLayerInput(Layer): class RecurrentLayerInput(Layer):
def __init__(self, recurrent_name, index, parent_layers): def __init__(self, recurrent_name, index, parent_layers, reverse):
parents_len = len(parent_layers) parents_len = len(parent_layers)
assert parents_len <= 1 assert parents_len <= 1
if parents_len == 0: if parents_len == 0:
...@@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer): ...@@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer):
else: else:
self.__parents__ = parent_layers.values()[0] self.__parents__ = parent_layers.values()[0]
self.__recurrent_name__ = recurrent_name self.__recurrent_name__ = recurrent_name
self.__reverse__ = reverse
name = self.__parents__[ name = self.__parents__[
index].name if index >= 0 else self.context_name() index].name if index >= 0 else self.context_name()
super(RecurrentLayerInput, self).__init__( super(RecurrentLayerInput, self).__init__(
...@@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer): ...@@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer):
model_type('recurrent_nn') model_type('recurrent_nn')
RecurrentLayerGroupWithoutOutLinksBegin( RecurrentLayerGroupWithoutOutLinksBegin(
name=self.__recurrent_name__, name=self.__recurrent_name__,
in_links=map(lambda x: x.name, self.__parents__)) in_links=map(lambda x: x.name, self.__parents__),
seq_reversed=self.__reverse__)
return self return self
...@@ -461,7 +463,7 @@ del each_layer_name ...@@ -461,7 +463,7 @@ del each_layer_name
@wrap_name_default() @wrap_name_default()
def recurrent_group(step, input, name=None): def recurrent_group(step, input, reverse=False, name=None):
if not isinstance(input, collections.Sequence): if not isinstance(input, collections.Sequence):
input = [input] input = [input]
...@@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None): ...@@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None):
RecurrentLayerInput( RecurrentLayerInput(
recurrent_name=name, recurrent_name=name,
index=i, index=i,
parent_layers={'recurrent_inputs': non_static_inputs}) parent_layers={'recurrent_inputs': non_static_inputs},
for i in xrange(len(non_static_inputs)) reverse=reverse) for i in xrange(len(non_static_inputs))
] ]
extra_input = None extra_input = None
if len(non_static_inputs) == 0: if len(non_static_inputs) == 0:
extra_input = RecurrentLayerInput( extra_input = RecurrentLayerInput(
recurrent_name=name, index=-1, parent_layers={}) recurrent_name=name, index=-1, parent_layers={}, reverse=reverse)
def __real_step__(*args): def __real_step__(*args):
rnn_input = list(args) rnn_input = list(args)
......
...@@ -233,6 +233,30 @@ class DataFeederTest(unittest.TestCase): ...@@ -233,6 +233,30 @@ class DataFeederTest(unittest.TestCase):
self.assertEqual(out_sparse.getSparseRowCols(i), data[i][1]) self.assertEqual(out_sparse.getSparseRowCols(i), data[i][1])
self.assertEqual(out_index[i], data[i][0]) self.assertEqual(out_index[i], data[i][0])
def test_dense_set_shape(self):
# test 2-D data
def gen_data(batch_size, shape):
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(np.random.random(shape))
data.append(each_sample)
return data
feeder = DataFeeder([('image', data_type.dense_array(2352))],
{'image': 0})
arg = feeder(gen_data(32, (3, 28, 28)))
h = arg.getSlotFrameHeight(0)
w = arg.getSlotFrameWidth(0)
self.assertEqual(h, 28)
self.assertEqual(w, 28)
arg = feeder(gen_data(32, (3, 30, 32)))
h = arg.getSlotFrameHeight(0)
w = arg.getSlotFrameWidth(0)
self.assertEqual(h, 30)
self.assertEqual(w, 32)
if __name__ == '__main__': if __name__ == '__main__':
api.initPaddle("--use_gpu=0") api.initPaddle("--use_gpu=0")
......
...@@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase): ...@@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase):
def test(): def test():
data = conf_helps.data_layer(name="word", size=dict_dim) data = conf_helps.data_layer(name="word", size=dict_dim)
embd = conf_helps.embedding_layer(input=data, size=word_dim) embd = conf_helps.embedding_layer(input=data, size=word_dim)
conf_helps.recurrent_group(name="rnn", step=step, input=embd) conf_helps.recurrent_group(
name="rnn", step=step, input=embd, reverse=True)
return str(parse_network(test)) return str(parse_network(test))
...@@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase): ...@@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase):
name="word", type=data_type.integer_value(dict_dim)) name="word", type=data_type.integer_value(dict_dim))
embd = layer.embedding(input=data, size=word_dim) embd = layer.embedding(input=data, size=word_dim)
rnn_layer = layer.recurrent_group( rnn_layer = layer.recurrent_group(
name="rnn", step=new_step, input=embd) name="rnn", step=new_step, input=embd, reverse=True)
return str(layer.parse_network(rnn_layer)) return str(layer.parse_network(rnn_layer))
diff = difflib.unified_diff(parse_old_rnn().splitlines(1), diff = difflib.unified_diff(parse_old_rnn().splitlines(1),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册