提交 9a831868 编写于 作者: Y Yu Yang 提交者: GitHub

Merge branch 'develop' into feature/clean_parameter_functionalities

...@@ -49,6 +49,7 @@ before_install: ...@@ -49,6 +49,7 @@ before_install:
# Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python # Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python
# protobuf version. # protobuf version.
- pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
- pip install rarfile
- | - |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script: script:
......
...@@ -56,7 +56,8 @@ RUN pip install --upgrade pip && \ ...@@ -56,7 +56,8 @@ RUN pip install --upgrade pip && \
pip install -U docopt PyYAML sphinx && \ pip install -U docopt PyYAML sphinx && \
pip install -U sphinx-rtd-theme==0.1.9 recommonmark && \ pip install -U sphinx-rtd-theme==0.1.9 recommonmark && \
pip install pre-commit 'requests==2.9.2' 'ipython==5.3.0' && \ pip install pre-commit 'requests==2.9.2' 'ipython==5.3.0' && \
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip install rarfile
# To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use # To fix https://github.com/PaddlePaddle/Paddle/issues/1954, we use
# the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2 # the solution in https://urllib3.readthedocs.io/en/latest/user-guide.html#ssl-py2
......
...@@ -14,6 +14,28 @@ ...@@ -14,6 +14,28 @@
INCLUDE(ExternalProject) INCLUDE(ExternalProject)
macro(PROMPT_PROTOBUF_LIB)
MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}")
MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}")
INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR})
RETURN()
endmacro()
set(PROTOBUF_ROOT "" CACHE PATH "Folder contains protobuf")
if (NOT "${PROTOBUF_ROOT}" STREQUAL "")
find_path(PROTOBUF_INCLUDE_DIR google/protobuf/message.h PATHS ${PROTOBUF_ROOT}/include)
find_library(PROTOBUF_LIBRARY protobuf PATHS ${PROTOBUF_ROOT}/lib)
find_library(PROTOBUF_LITE_LIBRARY protobuf-lite PATHS ${PROTOBUF_ROOT}/lib)
find_library(PROTOBUF_PROTOC_LIBRARY protoc PATHS ${PROTOBUF_ROOT}/lib)
find_program(PROTOBUF_PROTOC_EXECUTABLE protoc PATHS ${PROTOBUF_ROOT}/bin)
if (PROTOBUF_INCLUDE_DIR AND PROTOBUF_LIBRARY AND PROTOBUF_LITE_LIBRARY AND PROTOBUF_PROTOC_LIBRARY AND PROTOBUF_PROTOC_EXECUTABLE)
message(STATUS "Using custom protobuf library in ${PROTOBUF_ROOT}.")
PROMPT_PROTOBUF_LIB()
else()
message(WARNING "Cannot find protobuf library in ${PROTOBUF_ROOT}.")
endif()
endif()
FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST) FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST)
SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME}) SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME})
SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME}) SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME})
...@@ -107,6 +129,4 @@ IF(NOT PROTOBUF_FOUND) ...@@ -107,6 +129,4 @@ IF(NOT PROTOBUF_FOUND)
SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE) SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE)
ENDIF(NOT PROTOBUF_FOUND) ENDIF(NOT PROTOBUF_FOUND)
MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}") PROMPT_PROTOBUF_LIB()
MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}") \ No newline at end of file
INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR})
...@@ -150,7 +150,8 @@ endfunction() ...@@ -150,7 +150,8 @@ endfunction()
# which takes care of making correct running environment # which takes care of making correct running environment
function(add_python_test TEST_NAME) function(add_python_test TEST_NAME)
add_test(NAME ${TEST_NAME} add_test(NAME ${TEST_NAME}
COMMAND bash ${PROJ_ROOT}/paddle/scripts/run_python_tests.sh COMMAND env PADDLE_PACKAGE_DIR=${PADDLE_PYTHON_PACKAGE_DIR}
bash ${PROJ_ROOT}/paddle/scripts/run_python_tests.sh
${USE_VIRTUALENV_FOR_TEST} ${PYTHON_EXECUTABLE} ${ARGN} ${USE_VIRTUALENV_FOR_TEST} ${PYTHON_EXECUTABLE} ${ARGN}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
endfunction() endfunction()
...@@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c ...@@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c
## Task Queue ## Task Queue
As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress. As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *chunks* from one or multiple files. The master server maintains *task queues* to track the training progress.
### Task Queue Creation ### Task Queue Creation
...@@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da ...@@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da
func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error { func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error {
} }
``` ```
1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file. 1. The master server will scan through each RecordIO file to generate the *chunk index* and know how many chunks does each file have. A chunk can be referenced by the file path and the index of the chunk within the file. The chunk index is in memory data structure that enables fast access to each chunk, and the index of the chunk with the file is an integer start from 0, representing the n-th chunk within the file.
The definition of the block is: The definition of the chunk is:
```go ```go
type Block struct { type Chunk struct {
Idx int // index of the block within the file Idx int // index of the chunk within the file
Path string Path string
Index recordio.Index // block index Index recordio.Index // chunk index
} }
``` ```
1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. 1. Chunks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element.
The definition of the task is: The definition of the task is:
```go ```go
type Task struct { type Task struct {
Index int Index int
Blocks []Block Chunks []Chunk
} }
``` ```
......
...@@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function: ...@@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function:
```c ```c
int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto); int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto);
``` ```
The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will block until initialization is done, and return 0. As illustrated below: The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will return 0. `paddle_get_params` will be blocked until initialization is completed. As illustrated below:
<img src="./src/pserver_init.png"> <img src="./src/pserver_init.png">
...@@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client); ...@@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client);
* *
* paddle_begin_init_params will be called from multiple trainers, * paddle_begin_init_params will be called from multiple trainers,
* only one trainer will be selected to initialize the parameters on * only one trainer will be selected to initialize the parameters on
* parameter servers. Other trainers will be blocked until the * parameter servers. Other trainers need to get the initialized
* initialization is done, and they need to get the initialized
* parameters from parameter servers using @paddle_get_params. * parameters from parameter servers using @paddle_get_params.
* *
* @param pserver_config_proto serialized parameter server configuration in
* Protocol Buffers format.
* @return 1 if the trainer is selected to initialize parameter * @return 1 if the trainer is selected to initialize parameter
* servers, otherwise 0. * servers, otherwise 0.
*/ */
int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_config_proto); int paddle_begin_init_params(paddle_pserver_client* client);
/** /**
* @brief paddle_init_param initializes the parameter on parameter * @brief paddle_init_param initializes the parameter on parameter
...@@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_ ...@@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_
* *
* @param param the parameter to initialize. * @param param the parameter to initialize.
* @param param_config_proto the configuration for the parameter. * @param param_config_proto the configuration for the parameter.
* @param config_len the length of param_config_proto
* @return 0 if successful, otherwise -1. On failure, the trainer * @return 0 if successful, otherwise -1. On failure, the trainer
* needs to restart the entire initialization process (starting from * needs to restart the entire initialization process (starting from
* @paddle_begin_init_param). Or simply exit the program and wait for * @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer. * the cluster management system to restart the trainer.
*/ */
int paddle_init_param(paddle_pserver_client* client, paddle_parameter params, const char* param_config_proto); int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
/** /**
* @brief paddle_finish_init_params tells parameter servers client has * @brief paddle_finish_init_params tells parameter servers client has
......
# Design Doc: The C++ Class `Parameters`
`Parameters` is a concept we designed in Paddle V2 API. `Parameters` is a container of parameters, and make Paddle can shared parameter between topologies. We described usages of `Parameter` in [api.md](./api.md).
We used Python to implement Parameters when designing V2 API before. There are several defects for current implementation:
* We just use `memcpy` to share Parameters between topologies, but this is very inefficient.
* We did not implement share Parameters while training. We just trigger `memcpy` when start training.
It is necessary that we implement Parameters in CPP side. However, it could be a code refactoring for Paddle, because Paddle was designed for training only one topology before, i.e., each GradientMachine contains its Parameter as a data member. In current Paddle implementation, there are three concepts associated with `Parameters`:
1. `paddle::Parameter`. A `Parameters` is a container for `paddle::Parameter`.
It is evident that we should use `paddle::Parameter` when developing `Parameters`.
However, the `Parameter` class contains many functions and does not have a clear interface.
It contains `create/store Parameter`, `serialize/deserialize`, `optimize(i.e SGD)`, `randomize/zero`.
When we developing `Parameters`, we only use `create/store Parameter` functionality.
We should extract functionalities of Parameter into many classes to clean Paddle CPP implementation.
2. `paddle::GradientMachine` and its sub-classes, e.g., `paddle::MultiGradientMachine`, `paddle::NeuralNetwork`.
We should pass `Parameters` to `paddle::GradientMachine` when `forward/backward` to avoid `memcpy` between topologies.
Also, we should handle multi-GPU/CPU training, because `forward` and `backward` would perform on multi-GPUs and multi-CPUs.
`Parameters` should dispatch the parameter value to each device, and gather the parameter gradient from each device.
3. `paddle::ParameterUpdater`. The ParameterUpdater is used to update parameters in Paddle.
So `Parameters` should be used by `paddle::ParameterUpdater`, and `paddle::ParameterUpdater` should optimize `Parameters` (by SGD).
The step by step approach for implementation Parameters in Paddle C++ core is listed below. Each step should be a PR and could be merged into Paddle one by one.
1. Clean `paddle::Parameter` interface. Extract the functionalities of `paddle::Parameter` to prepare for the implementation of Parameters.
2. Implementation a `Parameters` class. It just stores the `paddle::Parameter` inside. Make `GradientMachine` uses `Parameters` as a class member.
3. Make `Parameters` support Multi-CPU and Multi-GPU training to prepare for sharing `Parameter` between topologies.
Because we need share `Parameters` between topologies, it is `Parameters`'s response to exchange Parameters between GPUs.
`GradientMachine` should not handle how to exchange Parameters because `GradientMachine` only used to train one topology and we need to support train many topologies in Paddle, i.e., there could be many GradientMachines use one `Parameters`.
* We should use a global function to exchange Parameters between GPUs, not a member function in `Parameters`. The `MultiGradientMachine` invoke this function, which uses `Parameters` as this function inputs.
* The MultiGradientMachine contains many functionalities. Extracting the Parameters exchanging logic could make MultiGradientMachine clearer and simpler.
4. Make `Parameters` as an argument for `forward/backward` function, not a data member for `GradientMachine`. For example, `forward` could be `forward(const Parameters& params, ...)` and `backward` could be `backward(Parameters* params, ...)`. After this step, Paddle could share `Parameters` between topologies.
5. `ParameterUpdater` is invoked by `GradientMachine` and `Trainer`, but it updates `Parameters`. In the end of this code refactoring, we could change `ParameterUpdater` directly uses `Parameters` to make `ParameterUpdater`'s implementation clear.
# DeepSpeech2 on PaddlePaddle: Design Doc
We are planning to build Deep Speech 2 (DS2) \[[1](#references)\], a powerful Automatic Speech Recognition (ASR) engine, on PaddlePaddle. For the first-stage plan, we have the following short-term goals:
- Release a basic distributed implementation of DS2 on PaddlePaddle.
- Contribute a chapter of Deep Speech to PaddlePaddle Book.
Intensive system optimization and low-latency inference library (details in \[[1](#references)\]) are not yet covered in this first-stage plan.
## Table of Contents
- [Tasks](#tasks)
- [Task Dependency](#task-dependency)
- [Design Details](#design-details)
- [Overview](#overview)
- [Row Convolution](#row-convolution)
- [Beam Search With CTC and LM](#beam-search-with-ctc-and-lm)
- [Future Work](#future-work)
- [References](#references)
## Tasks
We roughly break down the project into 14 tasks:
1. Develop an **audio data provider**:
- Json filelist generator.
- Audio file format transformer.
- Spectrogram feature extraction, power normalization etc.
- Batch data reader with SortaGrad.
- Data augmentation (optional).
- Prepare (one or more) public English data sets & baseline.
2. Create a **simplified DS2 model configuration**:
- With only fixed-length (by padding) audio sequences (otherwise need *Task 3*).
- With only bidirectional-GRU (otherwise need *Task 4*).
- With only greedy decoder (otherwise need *Task 5, 6*).
3. Develop to support **variable-shaped** dense-vector (image) batches of input data.
- Update `DenseScanner` in `dataprovider_converter.py`, etc.
4. Develop a new **lookahead-row-convolution layer** (See \[[1](#references)\] for details):
- Lookahead convolution windows.
- Within-row convolution, without kernels shared across rows.
5. Build KenLM **language model** (5-gram) for beam search decoder:
- Use KenLM toolkit.
- Prepare the corpus & train the model.
- Create infererence interfaces (for Task 6).
6. Develop a **beam search decoder** with CTC + LM + WORDCOUNT:
- Beam search with CTC.
- Beam search with external custom scorer (e.g. LM).
- Try to design a more general beam search interface.
7. Develop a **Word Error Rate evaluator**:
- update `ctc_error_evaluator`(CER) to support WER.
8. Prepare internal dataset for Mandarin (optional):
- Dataset, baseline, evaluation details.
- Particular data preprocessing for Mandarin.
- Might need cooperating with the Speech Department.
9. Create **standard DS2 model configuration**:
- With variable-length audio sequences (need *Task 3*).
- With unidirectional-GRU + row-convolution (need *Task 4*).
- With CTC-LM beam search decoder (need *Task 5, 6*).
10. Make it run perfectly on **clusters**.
11. Experiments and **benchmarking** (for accuracy, not efficiency):
- With public English dataset.
- With internal (Baidu) Mandarin dataset (optional).
12. Time **profiling** and optimization.
13. Prepare **docs**.
14. Prepare PaddlePaddle **Book** chapter with a simplified version.
## Task Dependency
Tasks parallelizable within phases:
Roadmap | Description | Parallelizable Tasks
----------- | :------------------------------------ | :--------------------
Phase I | Simplified model & components | *Task 1* ~ *Task 8*
Phase II | Standard model & benchmarking & profiling | *Task 9* ~ *Task 12*
Phase III | Documentations | *Task13* ~ *Task14*
Issue for each task will be created later. Contributions, discussions and comments are all highly appreciated and welcomed!
## Design Details
### Overview
Traditional **ASR** (Automatic Speech Recognition) pipelines require great human efforts devoted to elaborately tuning multiple hand-engineered components (e.g. audio feature design, accoustic model, pronuncation model and language model etc.). **Deep Speech 2** (**DS2**) \[[1](#references)\], however, trains such ASR models in an end-to-end manner, replacing most intermediate modules with only a single deep network architecture. With scaling up both the data and model sizes, DS2 achieves a very significant performance boost.
Please read Deep Speech 2 \[[1](#references),[2](#references)\] paper for more background knowledge.
The classical DS2 network contains 15 layers (from bottom to top):
- **Two** data layers (audio spectrogram, transcription text)
- **Three** 2D convolution layers
- **Seven** uni-directional simple-RNN layers
- **One** lookahead row convolution layers
- **One** fully-connected layers
- **One** CTC-loss layer
<div align="center">
<img src="image/ds2_network.png" width=350><br/>
Figure 1. Archetecture of Deep Speech 2 Network.
</div>
We don't have to persist on this 2-3-7-1-1-1 depth \[[2](#references)\]. Similar networks with different depths might also work well. As in \[[1](#references)\], authors use a different depth (e.g. 2-2-3-1-1-1) for final experiments.
Key ingredients about the layers:
- **Data Layers**:
- Frame sequences data of audio **spectrogram** (with FFT).
- Token sequences data of **transcription** text (labels).
- These two type of sequences do not have the same lengthes, thus a CTC-loss layer is required.
- **2D Convolution Layers**:
- Not only temporal convolution, but also **frequency convolution**. Like a 2D image convolution, but with a variable dimension (i.e. temporal dimension).
- With striding for only the first convlution layer.
- No pooling for all convolution layers.
- **Uni-directional RNNs**
- Uni-directional + row convolution: for low-latency inference.
- Bi-direcitional + without row convolution: if we don't care about the inference latency.
- **Row convolution**:
- For looking only a few steps ahead into the feature, instead of looking into a whole sequence in bi-directional RNNs.
- Not nessesary if with bi-direcitional RNNs.
- "**Row**" means convolutions are done within each frequency dimension (row), and no convolution kernels shared across.
- **Batch Normalization Layers**:
- Added to all above layers (except for data and loss layer).
- Sequence-wise normalization for RNNs: BatchNorm only performed on input-state projection and not state-state projection, for efficiency consideration.
Required Components | PaddlePaddle Support | Need to Develop
:------------------------------------- | :-------------------------------------- | :-----------------------
Data Layer I (Spectrogram) | Not supported yet. | TBD (Task 3)
Data Layer II (Transcription) | `paddle.data_type.integer_value_sequence` | -
2D Convolution Layer | `paddle.layer.image_conv_layer` | -
DataType Converter (vec2seq) | `paddle.layer.block_expand` | -
Bi-/Uni-directional RNNs | `paddle.layer.recurrent_group` | -
Row Convolution Layer | Not supported yet. | TBD (Task 4)
CTC-loss Layer | `paddle.layer.warp_ctc` | -
Batch Normalization Layer | `paddle.layer.batch_norm` | -
CTC-Beam search | Not supported yet. | TBD (Task 6)
### Row Convolution
TODO by Assignees
### Beam Search with CTC and LM
TODO by Assignees
## Future Work
- Efficiency Improvement
- Accuracy Improvement
- Low-latency Inference Library
- Large-scale benchmarking
## References
1. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](http://proceedings.mlr.press/v48/amodei16.pdf). ICML 2016.
2. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](https://arxiv.org/abs/1512.02595). arXiv:1512.02595.
...@@ -2,8 +2,10 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) ...@@ -2,8 +2,10 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
go_library(adder SRCS adder.go) go_library(adder SRCS adder.go)
cc_test(cgo_test if (WITH_TESTING)
cc_test(cgo_test
SRCS SRCS
cgo_test.cc cgo_test.cc
DEPS DEPS
adder) adder)
endif()
package main
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/paddle/go/master"
"github.com/PaddlePaddle/Paddle/paddle/go/recordio"
)
func main() {
port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")
faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()
if *dataset == "" {
panic("no dataset specified.")
}
if *faultTolerance {
panic("fault tolernance not implemented.")
}
var chunks []master.Chunk
var paths []string
ss := strings.Split(*dataset, ",")
fmt.Println(ss)
for _, s := range ss {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
}
paths = append(paths, match...)
}
if len(paths) == 0 {
panic("no valid datset specified.")
}
idx := 0
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
panic(err)
}
index, err := recordio.LoadIndex(f)
if err != nil {
panic(err)
}
f.Close()
count := index.NumChunks()
for i := 0; i < count; i++ {
chunk := master.Chunk{
Idx: idx,
Path: path,
Index: *index.ChunkIndex(i),
}
chunks = append(chunks, chunk)
}
}
s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
if err != nil {
panic(err)
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
panic(err)
}
err = http.Serve(l, nil)
if err != nil {
panic(err)
}
}
package main package main
import ( import (
"flag"
"net" "net"
"net/http" "net/http"
"net/rpc" "net/rpc"
"strconv" "strconv"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/paddle/go/pserver" "github.com/PaddlePaddle/Paddle/paddle/go/pserver"
) )
func main() { func main() {
port := flag.Int("p", 0, "port of the pserver") port := flag.Int("port", 0, "port of the pserver")
flag.Parse() flag.Parse()
s := pserver.NewService() s := pserver.NewService()
......
package master
import (
"errors"
"log"
"sync"
"time"
"github.com/PaddlePaddle/Paddle/paddle/go/recordio"
)
const (
targetTaskCount = 300
)
// errors
var (
ErrNoMoreTask = errors.New("no more task for current pass")
ErrPendingTaskNotFound = errors.New("pending task not found")
)
// Service is the master server service.
type Service struct {
timeoutDur time.Duration
timeoutMax int
mu sync.Mutex
taskQueues taskQueues
}
// Recover recovers service state from etcd.
func Recover() (*Service, error) {
// TODO(helin): recover from snapshot state from etcd.
return nil, nil
}
func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
id := 0
if chunksPerTask <= 0 {
chunksPerTask = 1
}
var result []taskEntry
var cur taskEntry
for i, c := range chunks {
if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 {
cur.Task.ID = id
id++
result = append(result, cur)
cur.Task.Chunks = nil
}
cur.Task.Chunks = append(cur.Task.Chunks, c)
}
if len(cur.Task.Chunks) > 0 {
cur.Task.ID = id
id++
result = append(result, cur)
}
return result
}
// NewService creates a new service.
func NewService(chunks []Chunk, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service {
s := &Service{}
s.timeoutDur = timeoutDur
s.timeoutMax = timeoutMax
s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry)
s.taskQueues.Todo = partition(chunks, chunksPerTask)
return s
}
// Chunk is a chunk of data consisted of several data instances.
type Chunk struct {
Idx int // index of the chunk within the file
Path string
Index recordio.Index // block index
}
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
ID int
Chunks []Chunk
}
type taskEntry struct {
Epoch int
NumTimeout int
Task Task
}
type taskQueues struct {
Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry
Failed []Task
}
// *must* be called with s.mu being held.
func (s *Service) snapshot() error {
// TODO(helin): snapshot state on etcd.
return nil
}
// GetTask gets a new task from the service.
func (s *Service) GetTask(dummy int, task *Task) error {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.taskQueues.Todo) == 0 {
return ErrNoMoreTask
}
t := s.taskQueues.Todo[0]
t.Epoch++
s.taskQueues.Todo = s.taskQueues.Todo[1:]
s.taskQueues.Pending[t.Task.ID] = t
err := s.snapshot()
if err != nil {
return err
}
time.AfterFunc(s.timeoutDur, func(taskID int, epoch int) func() {
return func() {
s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID]
if !ok {
return
}
if t.Epoch != epoch {
// new epoch, task launched after the
// schedule of this timeout check.
return
}
defer func() {
err := s.snapshot()
if err != nil {
log.Println(err)
}
}()
delete(s.taskQueues.Pending, t.Task.ID)
t.NumTimeout++
if t.NumTimeout > s.timeoutMax {
s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task)
return
}
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
}
}(t.Task.ID, t.Epoch))
return nil
}
// TaskFinished tell the service that a task is finished.
func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID]
if !ok {
return ErrPendingTaskNotFound
}
// task finished, reset timeout
t.NumTimeout = 0
s.taskQueues.Done = append(s.taskQueues.Done, t)
delete(s.taskQueues.Pending, taskID)
return s.snapshot()
}
package master
import "testing"
func TestPartitionCount(t *testing.T) {
cs := make([]Chunk, 100)
ts := partition(cs, 5)
if len(ts) != 20 {
t.Error(len(ts))
}
cs = make([]Chunk, 101)
ts = partition(cs, 5)
if len(ts) != 21 {
t.Error(len(ts))
}
ts = partition(cs, 1)
if len(ts) != 101 {
t.Error(len(ts))
}
ts = partition(cs, 0)
if len(ts) != 101 {
t.Error(len(ts))
}
}
func TestPartionIndex(t *testing.T) {
cs := make([]Chunk, 100)
ts := partition(cs, 20)
for i := range ts {
if ts[i].Task.ID != i {
t.Error(ts[i], i)
}
}
}
...@@ -17,6 +17,7 @@ import collections ...@@ -17,6 +17,7 @@ import collections
import swig_paddle import swig_paddle
import numpy import numpy
import itertools import itertools
from functools import reduce
__all__ = ['DataProviderConverter'] __all__ = ['DataProviderConverter']
...@@ -65,6 +66,8 @@ class IScanner(object): ...@@ -65,6 +66,8 @@ class IScanner(object):
:param argument: Output arguments object. :param argument: Output arguments object.
:type argument: swig_paddle.Arguments :type argument: swig_paddle.Arguments
:param dat: Output arguments object.
:type dat: The Python object, numpy.array or List.
:return: :return:
""" """
pass pass
...@@ -95,17 +98,35 @@ class DenseScanner(IScanner): ...@@ -95,17 +98,35 @@ class DenseScanner(IScanner):
def __init__(self, input_type, pos): def __init__(self, input_type, pos):
IScanner.__init__(self, input_type, pos) IScanner.__init__(self, input_type, pos)
self.__mat__ = None self.__mat__ = None
self.__shape__ = None
self.__height__ = 0 self.__height__ = 0
self.__dim__ = 0
def pre_scan(self, dat): def pre_scan(self, dat):
self.__height__ += 1 self.__height__ += 1
if self.__shape__ is None:
self.__shape__ = numpy.array(dat).shape
if len(self.__shape__) > 3:
raise ValueError(
"The dimension of input cannot be greater than 3.")
self.__dim__ = reduce(lambda x, y: x * y, self.__shape__)
if len(self.__shape__) == 1 and self.__dim__ != self.input_type.dim:
raise ValueError(
"The data size must be equal to it in data layer.")
else:
if self.__shape__ != numpy.array(dat).shape:
raise ValueError(
"The data shape must be same in one mini-batch.")
def finish_pre_scan(self, argument): def finish_pre_scan(self, argument):
self.__mat__ = numpy.ndarray( self.__mat__ = numpy.ndarray(
shape=(self.__height__, self.input_type.dim), dtype=numpy.float32) shape=(self.__height__, self.__dim__), dtype=numpy.float32)
self.__height__ = 0 self.__height__ = 0
def scan(self, dat): def scan(self, dat):
# It's better to use NumPy array for speed.
dat = numpy.array(dat)
dat = dat.flatten()
self.__mat__[self.__height__] = dat self.__mat__[self.__height__] = dat
self.__height__ += 1 self.__height__ += 1
...@@ -116,6 +137,14 @@ class DenseScanner(IScanner): ...@@ -116,6 +137,14 @@ class DenseScanner(IScanner):
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True,
self.data_in_gpu) self.data_in_gpu)
argument.setSlotValue(self.pos, m) argument.setSlotValue(self.pos, m)
if len(self.__shape__) > 1:
# The last-two dimenstions are the frame height and width.
# For example, the layout is CHW for 3-D feature of image.
# The H and W are the fram height and width.
h, w = self.__shape__[-2:]
argument.setSlotFrameHeight(self.pos, h)
argument.setSlotFrameWidth(self.pos, w)
self.__shape__ = None
class SparseBinaryScanner(IScanner): class SparseBinaryScanner(IScanner):
......
...@@ -24,12 +24,21 @@ PYTHON=$1; shift ...@@ -24,12 +24,21 @@ PYTHON=$1; shift
if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then
rm -rf .test_env rm -rf .test_env
virtualenv .test_env virtualenv .test_env
unset PYTHONHOME
unset PYTHONPATH
source .test_env/bin/activate source .test_env/bin/activate
PYTHON=python PYTHON=python
fi fi
export PYTHONPATH=$SCRIPTPATH/../../python/ $PYTHON -m pip install $SCRIPTPATH/../dist/*.whl
$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3
if [ "X${PADDLE_PACKAGE_DIR}" != "X" ]; then
$PYTHON -m pip install ${PADDLE_PACKAGE_DIR}/*.whl
else
export PYTHONPATH=$SCRIPTPATH/../../python/
fi
$PYTHON -m pip install ipython==5.3
for fn in "$@" for fn in "$@"
do do
......
...@@ -23,7 +23,9 @@ add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp ...@@ -23,7 +23,9 @@ add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
add_custom_target(paddle_python ALL DEPENDS add_custom_target(paddle_python ALL DEPENDS
${OUTPUT_DIR}/.timestamp) ${OUTPUT_DIR}/.timestamp)
set(PADDLE_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/)
add_subdirectory(paddle/trainer_config_helpers/tests) add_subdirectory(paddle/trainer_config_helpers/tests)
if (WITH_SWIG_PY) if (WITH_SWIG_PY)
# enable v2 API unittest only when paddle swig api is compiled # enable v2 API unittest only when paddle swig api is compiled
add_subdirectory(paddle/v2/tests) add_subdirectory(paddle/v2/tests)
...@@ -31,6 +33,6 @@ if (WITH_SWIG_PY) ...@@ -31,6 +33,6 @@ if (WITH_SWIG_PY)
add_subdirectory(paddle/v2/plot/tests) add_subdirectory(paddle/v2/plot/tests)
endif() endif()
install(DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dist/ install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR}
DESTINATION opt/paddle/share/wheels DESTINATION opt/paddle/share/wheels
) )
...@@ -72,9 +72,16 @@ class InputType(object): ...@@ -72,9 +72,16 @@ class InputType(object):
def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE): def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE):
""" """
Dense Vector. It means the input feature is dense float vector. For example, Dense Array. It means the input feature is dense array with float type.
if the input is an image with 28*28 pixels, the input of Paddle neural For example, if the input is an image with 28*28 pixels, the input of
network should be a dense vector with dimension 784. Paddle neural network could be a dense vector with dimension 784 or a
numpy array with shape (28, 28).
For the 2-D convolution operation, each sample in one mini-batch must have
the similarly size in PaddlePaddle now. But, it supports variable-dimension
feature across mini-batch. For the variable-dimension, the param dim is not
used. While the data reader must yield numpy array and the data feeder will
set the data shape correctly.
:param dim: dimension of this vector. :param dim: dimension of this vector.
:type dim: int :type dim: int
...@@ -135,6 +142,10 @@ sparse_binary_vector = sparse_non_value_slot ...@@ -135,6 +142,10 @@ sparse_binary_vector = sparse_non_value_slot
sparse_vector = sparse_value_slot sparse_vector = sparse_value_slot
integer_value = index_slot integer_value = index_slot
# dense_array can be used for variable-length input feature.
# Each feature is not a vector, but a multi-dimensional array.
dense_array = dense_slot
def dense_vector_sequence(dim): def dense_vector_sequence(dim):
""" """
......
...@@ -110,15 +110,16 @@ class ParameterAttribute(object): ...@@ -110,15 +110,16 @@ class ParameterAttribute(object):
momentum=None, momentum=None,
gradient_clipping_threshold=None, gradient_clipping_threshold=None,
sparse_update=False): sparse_update=False):
# initialize strategy. self.attr = {}
if is_static: if is_static:
self.attr = {'is_static': True} self.attr['is_static'] = True
elif initial_std is None and initial_mean is None and initial_max \
if initial_std is None and initial_mean is None and initial_max \
is None and initial_min is None: is None and initial_min is None:
self.attr = {'initial_smart': True} self.attr['initial_smart'] = True
elif is_compatible_with(initial_std, float) or \ elif is_compatible_with(initial_std, float) or \
is_compatible_with(initial_mean, float): is_compatible_with(initial_mean, float):
self.attr = dict()
if initial_std is not None: if initial_std is not None:
self.attr['initial_std'] = initial_std self.attr['initial_std'] = initial_std
if initial_mean is not None: if initial_mean is not None:
...@@ -131,7 +132,6 @@ class ParameterAttribute(object): ...@@ -131,7 +132,6 @@ class ParameterAttribute(object):
assert initial_min < initial_max assert initial_min < initial_max
initial_mean = (initial_max + initial_min) / 2 initial_mean = (initial_max + initial_min) / 2
initial_std = initial_mean - initial_min initial_std = initial_mean - initial_min
self.attr = dict()
self.attr['initial_mean'] = initial_mean self.attr['initial_mean'] = initial_mean
self.attr['initial_std'] = initial_std self.attr['initial_std'] = initial_std
self.attr['initial_strategy'] = 1 # Uniform Random self.attr['initial_strategy'] = 1 # Uniform Random
......
...@@ -16,7 +16,8 @@ import paddle.trainer.PyDataProvider2 as pydp2 ...@@ -16,7 +16,8 @@ import paddle.trainer.PyDataProvider2 as pydp2
import_list = [ import_list = [
nm for nm in dir(pydp2) nm for nm in dir(pydp2)
if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm) if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm or
'array' in nm)
] ]
import_list.extend(['InputType']) import_list.extend(['InputType'])
......
...@@ -24,8 +24,9 @@ import conll05 ...@@ -24,8 +24,9 @@ import conll05
import uci_housing import uci_housing
import sentiment import sentiment
import wmt14 import wmt14
import mq2007
__all__ = [ __all__ = [
'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment' 'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment'
'uci_housing', 'wmt14' 'uci_housing', 'wmt14', 'mq2007'
] ]
...@@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__ ...@@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__
class RecurrentLayerInput(Layer): class RecurrentLayerInput(Layer):
def __init__(self, recurrent_name, index, parent_layers): def __init__(self, recurrent_name, index, parent_layers, reverse):
parents_len = len(parent_layers) parents_len = len(parent_layers)
assert parents_len <= 1 assert parents_len <= 1
if parents_len == 0: if parents_len == 0:
...@@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer): ...@@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer):
else: else:
self.__parents__ = parent_layers.values()[0] self.__parents__ = parent_layers.values()[0]
self.__recurrent_name__ = recurrent_name self.__recurrent_name__ = recurrent_name
self.__reverse__ = reverse
name = self.__parents__[ name = self.__parents__[
index].name if index >= 0 else self.context_name() index].name if index >= 0 else self.context_name()
super(RecurrentLayerInput, self).__init__( super(RecurrentLayerInput, self).__init__(
...@@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer): ...@@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer):
model_type('recurrent_nn') model_type('recurrent_nn')
RecurrentLayerGroupWithoutOutLinksBegin( RecurrentLayerGroupWithoutOutLinksBegin(
name=self.__recurrent_name__, name=self.__recurrent_name__,
in_links=map(lambda x: x.name, self.__parents__)) in_links=map(lambda x: x.name, self.__parents__),
seq_reversed=self.__reverse__)
return self return self
...@@ -461,7 +463,7 @@ del each_layer_name ...@@ -461,7 +463,7 @@ del each_layer_name
@wrap_name_default() @wrap_name_default()
def recurrent_group(step, input, name=None): def recurrent_group(step, input, reverse=False, name=None):
if not isinstance(input, collections.Sequence): if not isinstance(input, collections.Sequence):
input = [input] input = [input]
...@@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None): ...@@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None):
RecurrentLayerInput( RecurrentLayerInput(
recurrent_name=name, recurrent_name=name,
index=i, index=i,
parent_layers={'recurrent_inputs': non_static_inputs}) parent_layers={'recurrent_inputs': non_static_inputs},
for i in xrange(len(non_static_inputs)) reverse=reverse) for i in xrange(len(non_static_inputs))
] ]
extra_input = None extra_input = None
if len(non_static_inputs) == 0: if len(non_static_inputs) == 0:
extra_input = RecurrentLayerInput( extra_input = RecurrentLayerInput(
recurrent_name=name, index=-1, parent_layers={}) recurrent_name=name, index=-1, parent_layers={}, reverse=reverse)
def __real_step__(*args): def __real_step__(*args):
rnn_input = list(args) rnn_input = list(args)
......
add_python_test(test_ploter test_ploter.py) if (NOT APPLE)
# The Mac OS X backend will not be able to function correctly if Python is
# not installed as a framework.
add_python_test(test_ploter test_ploter.py)
endif()
...@@ -233,6 +233,30 @@ class DataFeederTest(unittest.TestCase): ...@@ -233,6 +233,30 @@ class DataFeederTest(unittest.TestCase):
self.assertEqual(out_sparse.getSparseRowCols(i), data[i][1]) self.assertEqual(out_sparse.getSparseRowCols(i), data[i][1])
self.assertEqual(out_index[i], data[i][0]) self.assertEqual(out_index[i], data[i][0])
def test_dense_set_shape(self):
# test 2-D data
def gen_data(batch_size, shape):
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(np.random.random(shape))
data.append(each_sample)
return data
feeder = DataFeeder([('image', data_type.dense_array(2352))],
{'image': 0})
arg = feeder(gen_data(32, (3, 28, 28)))
h = arg.getSlotFrameHeight(0)
w = arg.getSlotFrameWidth(0)
self.assertEqual(h, 28)
self.assertEqual(w, 28)
arg = feeder(gen_data(32, (3, 30, 32)))
h = arg.getSlotFrameHeight(0)
w = arg.getSlotFrameWidth(0)
self.assertEqual(h, 30)
self.assertEqual(w, 32)
if __name__ == '__main__': if __name__ == '__main__':
api.initPaddle("--use_gpu=0") api.initPaddle("--use_gpu=0")
......
...@@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase): ...@@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase):
def test(): def test():
data = conf_helps.data_layer(name="word", size=dict_dim) data = conf_helps.data_layer(name="word", size=dict_dim)
embd = conf_helps.embedding_layer(input=data, size=word_dim) embd = conf_helps.embedding_layer(input=data, size=word_dim)
conf_helps.recurrent_group(name="rnn", step=step, input=embd) conf_helps.recurrent_group(
name="rnn", step=step, input=embd, reverse=True)
return str(parse_network(test)) return str(parse_network(test))
...@@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase): ...@@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase):
name="word", type=data_type.integer_value(dict_dim)) name="word", type=data_type.integer_value(dict_dim))
embd = layer.embedding(input=data, size=word_dim) embd = layer.embedding(input=data, size=word_dim)
rnn_layer = layer.recurrent_group( rnn_layer = layer.recurrent_group(
name="rnn", step=new_step, input=embd) name="rnn", step=new_step, input=embd, reverse=True)
return str(layer.parse_network(rnn_layer)) return str(layer.parse_network(rnn_layer))
diff = difflib.unified_diff(parse_old_rnn().splitlines(1), diff = difflib.unified_diff(parse_old_rnn().splitlines(1),
......
from setuptools import setup from setuptools import setup
packages=['paddle', packages=['paddle',
'paddle.proto', 'paddle.proto',
'paddle.trainer', 'paddle.trainer',
...@@ -18,6 +19,8 @@ setup(name='paddle', ...@@ -18,6 +19,8 @@ setup(name='paddle',
"numpy", "numpy",
"protobuf==${PROTOBUF_VERSION}", "protobuf==${PROTOBUF_VERSION}",
"matplotlib", "matplotlib",
"opencv-python",
"rarfile"
], ],
packages=packages, packages=packages,
package_dir={ package_dir={
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册