diff --git a/.gitignore b/.gitignore index 2b30f7938c8a1672acd0a14b7051af12c37889fb..275173b9677bffe028152fe8eadb3384329aeb5a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ third_party/ *~ bazel-* third_party/ + +# clion workspace. +cmake-build-* diff --git a/cmake/external/any.cmake b/cmake/external/any.cmake index 8116f235d535917c03deb646ff4ec083a0cdadc7..62eea42692b4191e53d0bbb0805786fd15ac7944 100644 --- a/cmake/external/any.cmake +++ b/cmake/external/any.cmake @@ -18,3 +18,4 @@ ExternalProject_Add( ) add_definitions(-DANY_IMPL_ANY_CAST_MOVEABLE) +LIST(APPEND external_project_dependencies linb_any) \ No newline at end of file diff --git a/cmake/external/protobuf.cmake b/cmake/external/protobuf.cmake index b35e6839cdc2ee062a9066585f0c83948d87e385..7340394b1e1fad9e1893ac87d62febb8dd72751c 100644 --- a/cmake/external/protobuf.cmake +++ b/cmake/external/protobuf.cmake @@ -14,6 +14,34 @@ INCLUDE(ExternalProject) +macro(PROMPT_PROTOBUF_LIB) + MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}") + MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}") + MESSAGE(STATUS "Protobuf version: ${PROTOBUF_VERSION}") + INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR}) + RETURN() +endmacro() +macro(SET_PROTOBUF_VERSION) + EXEC_PROGRAM(${PROTOBUF_PROTOC_EXECUTABLE} ARGS --version OUTPUT_VARIABLE PROTOBUF_VERSION) + STRING(REGEX MATCH "[0-9]+.[0-9]+" PROTOBUF_VERSION "${PROTOBUF_VERSION}") +endmacro() + +set(PROTOBUF_ROOT "" CACHE PATH "Folder contains protobuf") +if (NOT "${PROTOBUF_ROOT}" STREQUAL "") + find_path(PROTOBUF_INCLUDE_DIR google/protobuf/message.h PATHS ${PROTOBUF_ROOT}/include) + find_library(PROTOBUF_LIBRARY protobuf PATHS ${PROTOBUF_ROOT}/lib) + find_library(PROTOBUF_LITE_LIBRARY protobuf-lite PATHS ${PROTOBUF_ROOT}/lib) + find_library(PROTOBUF_PROTOC_LIBRARY protoc PATHS ${PROTOBUF_ROOT}/lib) + find_program(PROTOBUF_PROTOC_EXECUTABLE protoc PATHS ${PROTOBUF_ROOT}/bin) + if (PROTOBUF_INCLUDE_DIR AND PROTOBUF_LIBRARY AND PROTOBUF_LITE_LIBRARY AND PROTOBUF_PROTOC_LIBRARY AND PROTOBUF_PROTOC_EXECUTABLE) + message(STATUS "Using custom protobuf library in ${PROTOBUF_ROOT}.") + SET_PROTOBUF_VERSION() + PROMPT_PROTOBUF_LIB() + else() + message(WARNING "Cannot find protobuf library in ${PROTOBUF_ROOT}.") + endif() +endif() + FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST) SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME}) SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME}) @@ -78,8 +106,7 @@ IF(NOT CMAKE_CROSSCOMPILING) FIND_PACKAGE(Protobuf ${PROTOBUF_VERSION}) IF(PROTOBUF_FOUND) - EXEC_PROGRAM(${PROTOBUF_PROTOC_EXECUTABLE} ARGS --version OUTPUT_VARIABLE PROTOBUF_VERSION) - STRING(REGEX MATCH "[0-9]+.[0-9]+" PROTOBUF_VERSION "${PROTOBUF_VERSION}") + SET_PROTOBUF_VERSION() IF("${PROTOBUF_VERSION}" VERSION_LESS "3.1.0") SET(PROTOBUF_FOUND OFF) ENDIF() @@ -107,6 +134,4 @@ IF(NOT PROTOBUF_FOUND) SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE) ENDIF(NOT PROTOBUF_FOUND) -MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}") -MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}") -INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR}) +PROMPT_PROTOBUF_LIB() \ No newline at end of file diff --git a/doc/design/cluster_train/master_server.md b/doc/design/cluster_train/master_server.md index bb8307652587b4dc56cd668a3a5e64722734d194..4bf3c506f101361875043f8bfd97972b8c981a22 100644 --- a/doc/design/cluster_train/master_server.md +++ b/doc/design/cluster_train/master_server.md @@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c ## Task Queue -As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress. +As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *chunks* from one or multiple files. The master server maintains *task queues* to track the training progress. ### Task Queue Creation @@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error { } ``` -1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file. +1. The master server will scan through each RecordIO file to generate the *chunk index* and know how many chunks does each file have. A chunk can be referenced by the file path and the index of the chunk within the file. The chunk index is in memory data structure that enables fast access to each chunk, and the index of the chunk with the file is an integer start from 0, representing the n-th chunk within the file. - The definition of the block is: + The definition of the chunk is: ```go - type Block struct { - Idx int // index of the block within the file + type Chunk struct { + Idx int // index of the chunk within the file Path string - Index recordio.Index // block index + Index recordio.Index // chunk index } ``` -1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. +1. Chunks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. The definition of the task is: ```go type Task struct { Index int - Blocks []Block + Chunks []Chunk } ``` diff --git a/doc/design/cluster_train/pserver_client.md b/doc/design/cluster_train/pserver_client.md index 392bab25e9de6bf5aa7cc1b0ad345ef12f1d9e5d..007285640e9f11c55715291774826620419cec66 100644 --- a/doc/design/cluster_train/pserver_client.md +++ b/doc/design/cluster_train/pserver_client.md @@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function: ```c int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto); ``` -The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will block until initialization is done, and return 0. As illustrated below: +The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will return 0. `paddle_get_params` will be blocked until initialization is completed. As illustrated below: @@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client); * * paddle_begin_init_params will be called from multiple trainers, * only one trainer will be selected to initialize the parameters on - * parameter servers. Other trainers will be blocked until the - * initialization is done, and they need to get the initialized + * parameter servers. Other trainers need to get the initialized * parameters from parameter servers using @paddle_get_params. * - * @param pserver_config_proto serialized parameter server configuration in - * Protocol Buffers format. * @return 1 if the trainer is selected to initialize parameter * servers, otherwise 0. */ -int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_config_proto); +int paddle_begin_init_params(paddle_pserver_client* client); /** * @brief paddle_init_param initializes the parameter on parameter @@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_ * * @param param the parameter to initialize. * @param param_config_proto the configuration for the parameter. + * @param config_len the length of param_config_proto * @return 0 if successful, otherwise -1. On failure, the trainer * needs to restart the entire initialization process (starting from * @paddle_begin_init_param). Or simply exit the program and wait for * the cluster management system to restart the trainer. */ -int paddle_init_param(paddle_pserver_client* client, paddle_parameter params, const char* param_config_proto); +int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len); /** * @brief paddle_finish_init_params tells parameter servers client has diff --git a/doc/design/cluster_train/src/pserver_init.graffle b/doc/design/cluster_train/src/pserver_init.graffle index 730d3a561ffdc19e723b3cf6612471440951826a..5f3f1f52be8aa7f9049a8fcd6b7c93c8560c1676 100644 Binary files a/doc/design/cluster_train/src/pserver_init.graffle and b/doc/design/cluster_train/src/pserver_init.graffle differ diff --git a/doc/design/cluster_train/src/pserver_init.png b/doc/design/cluster_train/src/pserver_init.png index 4d502226d82ba271c50ae1bec5efaaaac4cc4434..dfe491ff98dd7db1c336093c80964a260df2cd90 100644 Binary files a/doc/design/cluster_train/src/pserver_init.png and b/doc/design/cluster_train/src/pserver_init.png differ diff --git a/doc/design/parameters_in_cpp.md b/doc/design/parameters_in_cpp.md new file mode 100644 index 0000000000000000000000000000000000000000..b6f99bc7d9d6fafacb0a4bcff806b65d9aef98cc --- /dev/null +++ b/doc/design/parameters_in_cpp.md @@ -0,0 +1,41 @@ +# Design Doc: The C++ Class `Parameters` + +`Parameters` is a concept we designed in Paddle V2 API. `Parameters` is a container of parameters, and make Paddle can shared parameter between topologies. We described usages of `Parameter` in [api.md](./api.md). + +We used Python to implement Parameters when designing V2 API before. There are several defects for current implementation: +* We just use `memcpy` to share Parameters between topologies, but this is very inefficient. +* We did not implement share Parameters while training. We just trigger `memcpy` when start training. + +It is necessary that we implement Parameters in CPP side. However, it could be a code refactoring for Paddle, because Paddle was designed for training only one topology before, i.e., each GradientMachine contains its Parameter as a data member. In current Paddle implementation, there are three concepts associated with `Parameters`: + +1. `paddle::Parameter`. A `Parameters` is a container for `paddle::Parameter`. +It is evident that we should use `paddle::Parameter` when developing `Parameters`. +However, the `Parameter` class contains many functions and does not have a clear interface. +It contains `create/store Parameter`, `serialize/deserialize`, `optimize(i.e SGD)`, `randomize/zero`. +When we developing `Parameters`, we only use `create/store Parameter` functionality. +We should extract functionalities of Parameter into many classes to clean Paddle CPP implementation. + +2. `paddle::GradientMachine` and its sub-classes, e.g., `paddle::MultiGradientMachine`, `paddle::NeuralNetwork`. +We should pass `Parameters` to `paddle::GradientMachine` when `forward/backward` to avoid `memcpy` between topologies. +Also, we should handle multi-GPU/CPU training, because `forward` and `backward` would perform on multi-GPUs and multi-CPUs. +`Parameters` should dispatch the parameter value to each device, and gather the parameter gradient from each device. + +3. `paddle::ParameterUpdater`. The ParameterUpdater is used to update parameters in Paddle. +So `Parameters` should be used by `paddle::ParameterUpdater`, and `paddle::ParameterUpdater` should optimize `Parameters` (by SGD). + + +The step by step approach for implementation Parameters in Paddle C++ core is listed below. Each step should be a PR and could be merged into Paddle one by one. + +1. Clean `paddle::Parameter` interface. Extract the functionalities of `paddle::Parameter` to prepare for the implementation of Parameters. + +2. Implementation a `Parameters` class. It just stores the `paddle::Parameter` inside. Make `GradientMachine` uses `Parameters` as a class member. + +3. Make `Parameters` support Multi-CPU and Multi-GPU training to prepare for sharing `Parameter` between topologies. +Because we need share `Parameters` between topologies, it is `Parameters`'s response to exchange Parameters between GPUs. +`GradientMachine` should not handle how to exchange Parameters because `GradientMachine` only used to train one topology and we need to support train many topologies in Paddle, i.e., there could be many GradientMachines use one `Parameters`. + * We should use a global function to exchange Parameters between GPUs, not a member function in `Parameters`. The `MultiGradientMachine` invoke this function, which uses `Parameters` as this function inputs. + * The MultiGradientMachine contains many functionalities. Extracting the Parameters exchanging logic could make MultiGradientMachine clearer and simpler. + +4. Make `Parameters` as an argument for `forward/backward` function, not a data member for `GradientMachine`. For example, `forward` could be `forward(const Parameters& params, ...)` and `backward` could be `backward(Parameters* params, ...)`. After this step, Paddle could share `Parameters` between topologies. + +5. `ParameterUpdater` is invoked by `GradientMachine` and `Trainer`, but it updates `Parameters`. In the end of this code refactoring, we could change `ParameterUpdater` directly uses `Parameters` to make `ParameterUpdater`'s implementation clear. diff --git a/doc/design/speech/README.MD b/doc/design/speech/README.MD new file mode 100644 index 0000000000000000000000000000000000000000..7304650e628dba210488cd2dc4836318b5383b2a --- /dev/null +++ b/doc/design/speech/README.MD @@ -0,0 +1,155 @@ +# DeepSpeech2 on PaddlePaddle: Design Doc + +We are planning to build Deep Speech 2 (DS2) \[[1](#references)\], a powerful Automatic Speech Recognition (ASR) engine, on PaddlePaddle. For the first-stage plan, we have the following short-term goals: + +- Release a basic distributed implementation of DS2 on PaddlePaddle. +- Contribute a chapter of Deep Speech to PaddlePaddle Book. + +Intensive system optimization and low-latency inference library (details in \[[1](#references)\]) are not yet covered in this first-stage plan. + +## Table of Contents + +- [Tasks](#tasks) +- [Task Dependency](#task-dependency) +- [Design Details](#design-details) + - [Overview](#overview) + - [Row Convolution](#row-convolution) + - [Beam Search With CTC and LM](#beam-search-with-ctc-and-lm) +- [Future Work](#future-work) +- [References](#references) + +## Tasks + +We roughly break down the project into 14 tasks: + +1. Develop an **audio data provider**: + - Json filelist generator. + - Audio file format transformer. + - Spectrogram feature extraction, power normalization etc. + - Batch data reader with SortaGrad. + - Data augmentation (optional). + - Prepare (one or more) public English data sets & baseline. +2. Create a **simplified DS2 model configuration**: + - With only fixed-length (by padding) audio sequences (otherwise need *Task 3*). + - With only bidirectional-GRU (otherwise need *Task 4*). + - With only greedy decoder (otherwise need *Task 5, 6*). +3. Develop to support **variable-shaped** dense-vector (image) batches of input data. + - Update `DenseScanner` in `dataprovider_converter.py`, etc. +4. Develop a new **lookahead-row-convolution layer** (See \[[1](#references)\] for details): + - Lookahead convolution windows. + - Within-row convolution, without kernels shared across rows. +5. Build KenLM **language model** (5-gram) for beam search decoder: + - Use KenLM toolkit. + - Prepare the corpus & train the model. + - Create infererence interfaces (for Task 6). +6. Develop a **beam search decoder** with CTC + LM + WORDCOUNT: + - Beam search with CTC. + - Beam search with external custom scorer (e.g. LM). + - Try to design a more general beam search interface. +7. Develop a **Word Error Rate evaluator**: + - update `ctc_error_evaluator`(CER) to support WER. +8. Prepare internal dataset for Mandarin (optional): + - Dataset, baseline, evaluation details. + - Particular data preprocessing for Mandarin. + - Might need cooperating with the Speech Department. +9. Create **standard DS2 model configuration**: + - With variable-length audio sequences (need *Task 3*). + - With unidirectional-GRU + row-convolution (need *Task 4*). + - With CTC-LM beam search decoder (need *Task 5, 6*). +10. Make it run perfectly on **clusters**. +11. Experiments and **benchmarking** (for accuracy, not efficiency): + - With public English dataset. + - With internal (Baidu) Mandarin dataset (optional). +12. Time **profiling** and optimization. +13. Prepare **docs**. +14. Prepare PaddlePaddle **Book** chapter with a simplified version. + +## Task Dependency + +Tasks parallelizable within phases: + +Roadmap | Description | Parallelizable Tasks +----------- | :------------------------------------ | :-------------------- +Phase I | Simplified model & components | *Task 1* ~ *Task 8* +Phase II | Standard model & benchmarking & profiling | *Task 9* ~ *Task 12* +Phase III | Documentations | *Task13* ~ *Task14* + +Issue for each task will be created later. Contributions, discussions and comments are all highly appreciated and welcomed! + +## Design Details + +### Overview + +Traditional **ASR** (Automatic Speech Recognition) pipelines require great human efforts devoted to elaborately tuning multiple hand-engineered components (e.g. audio feature design, accoustic model, pronuncation model and language model etc.). **Deep Speech 2** (**DS2**) \[[1](#references)\], however, trains such ASR models in an end-to-end manner, replacing most intermediate modules with only a single deep network architecture. With scaling up both the data and model sizes, DS2 achieves a very significant performance boost. + +Please read Deep Speech 2 \[[1](#references),[2](#references)\] paper for more background knowledge. + +The classical DS2 network contains 15 layers (from bottom to top): + +- **Two** data layers (audio spectrogram, transcription text) +- **Three** 2D convolution layers +- **Seven** uni-directional simple-RNN layers +- **One** lookahead row convolution layers +- **One** fully-connected layers +- **One** CTC-loss layer + +
+
+Figure 1. Archetecture of Deep Speech 2 Network. +
+ +We don't have to persist on this 2-3-7-1-1-1 depth \[[2](#references)\]. Similar networks with different depths might also work well. As in \[[1](#references)\], authors use a different depth (e.g. 2-2-3-1-1-1) for final experiments. + +Key ingredients about the layers: + +- **Data Layers**: + - Frame sequences data of audio **spectrogram** (with FFT). + - Token sequences data of **transcription** text (labels). + - These two type of sequences do not have the same lengthes, thus a CTC-loss layer is required. +- **2D Convolution Layers**: + - Not only temporal convolution, but also **frequency convolution**. Like a 2D image convolution, but with a variable dimension (i.e. temporal dimension). + - With striding for only the first convlution layer. + - No pooling for all convolution layers. +- **Uni-directional RNNs** + - Uni-directional + row convolution: for low-latency inference. + - Bi-direcitional + without row convolution: if we don't care about the inference latency. +- **Row convolution**: + - For looking only a few steps ahead into the feature, instead of looking into a whole sequence in bi-directional RNNs. + - Not nessesary if with bi-direcitional RNNs. + - "**Row**" means convolutions are done within each frequency dimension (row), and no convolution kernels shared across. +- **Batch Normalization Layers**: + - Added to all above layers (except for data and loss layer). + - Sequence-wise normalization for RNNs: BatchNorm only performed on input-state projection and not state-state projection, for efficiency consideration. + + +Required Components | PaddlePaddle Support | Need to Develop +:------------------------------------- | :-------------------------------------- | :----------------------- +Data Layer I (Spectrogram) | Not supported yet. | TBD (Task 3) +Data Layer II (Transcription) | `paddle.data_type.integer_value_sequence` | - +2D Convolution Layer | `paddle.layer.image_conv_layer` | - +DataType Converter (vec2seq) | `paddle.layer.block_expand` | - +Bi-/Uni-directional RNNs | `paddle.layer.recurrent_group` | - +Row Convolution Layer | Not supported yet. | TBD (Task 4) +CTC-loss Layer | `paddle.layer.warp_ctc` | - +Batch Normalization Layer | `paddle.layer.batch_norm` | - +CTC-Beam search | Not supported yet. | TBD (Task 6) + +### Row Convolution + +TODO by Assignees + +### Beam Search with CTC and LM + +TODO by Assignees + +## Future Work + +- Efficiency Improvement +- Accuracy Improvement +- Low-latency Inference Library +- Large-scale benchmarking + +## References + +1. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](http://proceedings.mlr.press/v48/amodei16.pdf). ICML 2016. +2. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](https://arxiv.org/abs/1512.02595). arXiv:1512.02595. diff --git a/doc/design/speech/image/ds2_network.png b/doc/design/speech/image/ds2_network.png new file mode 100644 index 0000000000000000000000000000000000000000..1a5b2184d47928cc2849d5a7c8ea2d8cf5337e11 Binary files /dev/null and b/doc/design/speech/image/ds2_network.png differ diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go new file mode 100644 index 0000000000000000000000000000000000000000..ef1f87c2dd53b701810c82ae90eaf3f94ea15e47 --- /dev/null +++ b/paddle/go/cmd/master/master.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "net" + "net/http" + "net/rpc" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/namsral/flag" + + "github.com/PaddlePaddle/Paddle/paddle/go/master" + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +func main() { + port := flag.Int("port", 8080, "port of the master server.") + dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.") + faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).") + taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") + taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") + chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") + flag.Parse() + + if *dataset == "" { + panic("no dataset specified.") + } + + if *faultTolerance { + panic("fault tolernance not implemented.") + } + + var chunks []master.Chunk + var paths []string + ss := strings.Split(*dataset, ",") + fmt.Println(ss) + for _, s := range ss { + match, err := filepath.Glob(s) + if err != nil { + panic(err) + } + paths = append(paths, match...) + } + + if len(paths) == 0 { + panic("no valid datset specified.") + } + + idx := 0 + for _, path := range paths { + f, err := os.Open(path) + if err != nil { + panic(err) + } + + index, err := recordio.LoadIndex(f) + if err != nil { + panic(err) + } + f.Close() + + count := index.NumChunks() + for i := 0; i < count; i++ { + chunk := master.Chunk{ + Idx: idx, + Path: path, + Index: *index.ChunkIndex(i), + } + chunks = append(chunks, chunk) + } + } + + s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) + err := rpc.Register(s) + if err != nil { + panic(err) + } + + rpc.HandleHTTP() + l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + if err != nil { + panic(err) + } + + err = http.Serve(l, nil) + if err != nil { + panic(err) + } +} diff --git a/paddle/go/cmd/pserver/pserver.go b/paddle/go/cmd/pserver/pserver.go index 41417875fb98aca3f2181841b28f7b220e948618..bd4bfc7028302df1c3e6ecd3cc9ebb11b158df01 100644 --- a/paddle/go/cmd/pserver/pserver.go +++ b/paddle/go/cmd/pserver/pserver.go @@ -1,17 +1,18 @@ package main import ( - "flag" "net" "net/http" "net/rpc" "strconv" + "github.com/namsral/flag" + "github.com/PaddlePaddle/Paddle/paddle/go/pserver" ) func main() { - port := flag.Int("p", 0, "port of the pserver") + port := flag.Int("port", 0, "port of the pserver") flag.Parse() s := pserver.NewService() diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go new file mode 100644 index 0000000000000000000000000000000000000000..75266482870c448fcde7359640bc4773c200fecb --- /dev/null +++ b/paddle/go/master/service.go @@ -0,0 +1,178 @@ +package master + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +const ( + targetTaskCount = 300 +) + +// errors +var ( + ErrNoMoreTask = errors.New("no more task for current pass") + ErrPendingTaskNotFound = errors.New("pending task not found") +) + +// Service is the master server service. +type Service struct { + timeoutDur time.Duration + timeoutMax int + + mu sync.Mutex + taskQueues taskQueues +} + +// Recover recovers service state from etcd. +func Recover() (*Service, error) { + // TODO(helin): recover from snapshot state from etcd. + return nil, nil +} + +func partition(chunks []Chunk, chunksPerTask int) []taskEntry { + id := 0 + if chunksPerTask <= 0 { + chunksPerTask = 1 + } + + var result []taskEntry + var cur taskEntry + for i, c := range chunks { + if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + cur.Task.Chunks = nil + } + + cur.Task.Chunks = append(cur.Task.Chunks, c) + } + + if len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + } + + return result +} + +// NewService creates a new service. +func NewService(chunks []Chunk, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service { + s := &Service{} + s.timeoutDur = timeoutDur + s.timeoutMax = timeoutMax + s.taskQueues = taskQueues{} + s.taskQueues.Pending = make(map[int]taskEntry) + s.taskQueues.Todo = partition(chunks, chunksPerTask) + return s +} + +// Chunk is a chunk of data consisted of several data instances. +type Chunk struct { + Idx int // index of the chunk within the file + Path string + Index recordio.Index // block index +} + +// Task is the basic unit of data instances assigned to trainers. +type Task struct { + ID int + Chunks []Chunk +} + +type taskEntry struct { + Epoch int + NumTimeout int + Task Task +} + +type taskQueues struct { + Todo []taskEntry + Pending map[int]taskEntry // map from task ID to task entry + Done []taskEntry + Failed []Task +} + +// *must* be called with s.mu being held. +func (s *Service) snapshot() error { + // TODO(helin): snapshot state on etcd. + return nil +} + +// GetTask gets a new task from the service. +func (s *Service) GetTask(dummy int, task *Task) error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.taskQueues.Todo) == 0 { + return ErrNoMoreTask + } + + t := s.taskQueues.Todo[0] + t.Epoch++ + s.taskQueues.Todo = s.taskQueues.Todo[1:] + s.taskQueues.Pending[t.Task.ID] = t + err := s.snapshot() + if err != nil { + return err + } + + time.AfterFunc(s.timeoutDur, func(taskID int, epoch int) func() { + return func() { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return + } + + if t.Epoch != epoch { + // new epoch, task launched after the + // schedule of this timeout check. + return + } + + defer func() { + err := s.snapshot() + if err != nil { + log.Println(err) + } + }() + + delete(s.taskQueues.Pending, t.Task.ID) + + t.NumTimeout++ + if t.NumTimeout > s.timeoutMax { + s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task) + return + } + + s.taskQueues.Todo = append(s.taskQueues.Todo, t) + } + }(t.Task.ID, t.Epoch)) + return nil +} + +// TaskFinished tell the service that a task is finished. +func (s *Service) TaskFinished(taskID int, dummy *int) error { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return ErrPendingTaskNotFound + } + + // task finished, reset timeout + t.NumTimeout = 0 + s.taskQueues.Done = append(s.taskQueues.Done, t) + delete(s.taskQueues.Pending, taskID) + return s.snapshot() +} diff --git a/paddle/go/master/service_internal_test.go b/paddle/go/master/service_internal_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bc435b505c014ca13ed5fc16b33a21ebb089a3b7 --- /dev/null +++ b/paddle/go/master/service_internal_test.go @@ -0,0 +1,37 @@ +package master + +import "testing" + +func TestPartitionCount(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 5) + if len(ts) != 20 { + t.Error(len(ts)) + } + + cs = make([]Chunk, 101) + ts = partition(cs, 5) + if len(ts) != 21 { + t.Error(len(ts)) + } + + ts = partition(cs, 1) + if len(ts) != 101 { + t.Error(len(ts)) + } + + ts = partition(cs, 0) + if len(ts) != 101 { + t.Error(len(ts)) + } +} + +func TestPartionIndex(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 20) + for i := range ts { + if ts[i].Task.ID != i { + t.Error(ts[i], i) + } + } +} diff --git a/paddle/math/BaseMatrix.h b/paddle/math/BaseMatrix.h index 6ed48c8d88ee698689de6f7a7f470b97a094ea5b..120d69f718b954925438fbd2119d69f0be13b3e9 100644 --- a/paddle/math/BaseMatrix.h +++ b/paddle/math/BaseMatrix.h @@ -758,7 +758,7 @@ public: T p3); // decayRate /// apply L1/L2 to *this* - void applyL1(T learningRate, T decayRate); + virtual void applyL1(T learningRate, T decayRate); void applyL1(BaseMatrixT& lr, T learningRate, T decayRate); void applyL2(T learningRate, T decayRate); void applyL2(BaseMatrixT& lr, T learningRate, T decayRate); diff --git a/paddle/math/SparseRowMatrix.cpp b/paddle/math/SparseRowMatrix.cpp index b8c781ca1fd46c9840817abe26a20eec005c37e9..b086433fe535225ad05453b7d13c3846f5ce3c2b 100644 --- a/paddle/math/SparseRowMatrix.cpp +++ b/paddle/math/SparseRowMatrix.cpp @@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() { clearRows(); } -void SparseRowCpuMatrix::applyL1Decay(real learningRate, real decayRate) { +void SparseRowCpuMatrix::applyL1(real learningRate, real decayRate) { apply([=](real* buf, size_t len) { CpuVector value(0, nullptr); value.subVecFrom(buf, 0, len); diff --git a/paddle/math/SparseRowMatrix.h b/paddle/math/SparseRowMatrix.h index 1ccbf97b25922ae52377d7048da3a07012d21003..8704eb038d5d42ca834d232c0a651e9ffb2b40f3 100644 --- a/paddle/math/SparseRowMatrix.h +++ b/paddle/math/SparseRowMatrix.h @@ -94,7 +94,7 @@ public: /** * apply L1 to all sparse rows, should be apply after indices ready. */ - void applyL1Decay(real learningRate, real decayRate); + virtual void applyL1(real learningRate, real decayRate); void clearIndices() { clearRows(); } void zeroMemThread(size_t tid, size_t numThreads); diff --git a/paddle/parameter/Parameter.cpp b/paddle/parameter/Parameter.cpp index b8efabbe2a0b54edec64f6cee62b44c76ca7bf10..ebe36d49376882fe4c1013e19dcf71f452b3e501 100644 --- a/paddle/parameter/Parameter.cpp +++ b/paddle/parameter/Parameter.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include "OptimizerFunctions.h" #include "OptimizerWithRegularizer.h" #include "ParameterUpdateFunctions.h" +#include "ThreadLocalBuffer.h" #include "hl_gpu.h" #include "paddle/math/CpuSparseMatrix.h" #include "paddle/math/MathUtils.h" @@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) { } } -SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() { - MatrixPtr mat = mats_[PARAMETER_VALUE]; - if (mat) { - return dynamic_cast(mat.get()); - } - - return nullptr; -} - void Parameter::incUpdate(const UpdateCallback& callback) { // Static parameter is fixed, and does not need to be updated if (isStatic()) { @@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) { return true; } -ThreadLocal> Parameter::tlsTempBufs_; - -VectorPtr* Parameter::getTlsTempBufs() { - std::vector& bufs = *tlsTempBufs_; - if (bufs.empty()) { - bufs.resize(NUM_PARAMETER_TYPES); - for (auto& vec : bufs) { - vec.reset(new CpuVector(0, nullptr)); - } - } - return bufs.data(); -} - -void Parameter::exec(ExecFunc func) { - auto execFunc = [this, func](int tid, size_t numThreads) { - if (numThreads == 1) { // single thread - func(this->getBufs()); - } else { // multi thread - VectorPtr* vecs = Parameter::getTlsTempBufs(); - auto interval = calcSplitArrayInterval( - this->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/); - for (size_t i = 0; i < (size_t)NUM_PARAMETER_TYPES; ++i) { - if (bufs_[i]) { - vecs[i]->subVecFrom(*bufs_[i], interval); - } - } - func(vecs); - } - }; - - getBuf(PARAMETER_VALUE)->exec(execFunc); -} - } // namespace paddle diff --git a/paddle/parameter/Parameter.h b/paddle/parameter/Parameter.h index 36d2b65f3bd1056a4ac6a1029000fe4cce6420ce..d77486ce42e049bf70cbe2a3feed0e203b2f5ac3 100644 --- a/paddle/parameter/Parameter.h +++ b/paddle/parameter/Parameter.h @@ -40,17 +40,6 @@ class Parameter; typedef std::function UpdateCallback; typedef std::function ParamInitCallback; -struct Segment { - int64_t beginDim; - int64_t endDim; - - // We allow the possibility that the parameters are not stored at contiguous - // memory locations for speed reason (i.e. data alignemnt) - // This means that the dimenstion is not same as the position in the memroy - // buffer. - int64_t beginPos; // beginning position in the local value or grad buffer -}; - class Parameter; typedef std::shared_ptr ParameterPtr; @@ -167,13 +156,6 @@ public: } } - void enableSharedType(ParameterType type, VectorPtr vec, MatType matType) { - if (!bufs_[type]) { - bufs_[type] = vec; - setMat(type, matType); - } - } - /// for batchGradientMachine: blockNum is number of partitions of the matrix. bool isGradShared(size_t* blockNum = NULL); @@ -203,20 +185,6 @@ public: const MatrixPtr& getMat(ParameterType pType) const { return mats_[pType]; } - const IVectorPtr& getIntBuf(ParameterType pType) { return intBufs_[pType]; } - - void setIntBuf(ParameterType pType, const IVectorPtr& iVec) { - intBufs_[pType] = iVec; - } - - SparsePrefetchRowCpuMatrix* getPrefetchMatrix(); - - float getLearnRate() const { return config_.learning_rate(); } - - float getInitMean() const { return config_.initial_mean(); } - - float getInitStandardDeviation() const { return config_.initial_std(); } - void setValueUpdated() { updated_ = true; } void clearValueUpdated() { updated_ = false; } @@ -243,8 +211,6 @@ public: */ bool load(std::istream& is); - std::vector& getGradientSegments() { return gradSegments_; } - void incShared() { sharedCount_++; } /** @@ -351,35 +317,21 @@ protected: int sharedCount_; int updateCounter_; - std::vector gradSegments_; // segments of non-zero gradient bool updated_; SparseFormat format_; - static ThreadLocal> tlsTempBufs_; - std::vector> updaterHooks_; public: - void setSharedCount(int cnt) { sharedCount_ = cnt; } int getSharedCount() { return sharedCount_; } - void singleUpdate(void* data); bool isSparse() { return config_.is_sparse(); } SparseFormat getFormat() { return format_; } static const std::string kMissParameterFail; static const std::string kMissParameterRand; static const std::string kMissParameterZero; - - static VectorPtr* getTlsTempBufs(); - - /** - * exec a func in single/multi thread. - * vecs is bufs_ of Parameter, as input of ExecFunc. - */ - typedef std::function ExecFunc; - void exec(ExecFunc func); }; typedef std::map ParameterMap; diff --git a/paddle/parameter/ThreadLocalBuffer.cpp b/paddle/parameter/ThreadLocalBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b21dd15245cf7c3d0418d37e6e8925c9e906f482 --- /dev/null +++ b/paddle/parameter/ThreadLocalBuffer.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "ThreadLocalBuffer.h" +#include "Parameter.h" + +namespace paddle { +namespace parameter { + +static ThreadLocal> tlsTempBufs_; + +VectorPtr* getThreadLocalBuffer() { + std::vector& bufs = *tlsTempBufs_; + if (bufs.empty()) { + bufs.resize(NUM_PARAMETER_TYPES); + for (auto& vec : bufs) { + vec.reset(new CpuVector(0, nullptr)); + } + } + return bufs.data(); +} + +} // namespace parameter +} // namespace paddle diff --git a/paddle/parameter/ThreadLocalBuffer.h b/paddle/parameter/ThreadLocalBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..c916519c974a5bdeea407dcc1bc6d196756874ee --- /dev/null +++ b/paddle/parameter/ThreadLocalBuffer.h @@ -0,0 +1,22 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include "paddle/math/Vector.h" + +namespace paddle { +namespace parameter { +extern VectorPtr* getThreadLocalBuffer(); +} // namespace parameter +} // namespace paddle diff --git a/paddle/pserver/ParameterClient2.cpp b/paddle/pserver/ParameterClient2.cpp index a97859f83fe6495b298e920346c964ef2a9b146c..f7e391f76324a09c203dfbbb449feb050caa8fb4 100644 --- a/paddle/pserver/ParameterClient2.cpp +++ b/paddle/pserver/ParameterClient2.cpp @@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData( CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize; const auto paraSize = parameter->getSize(); if (sparseUpdate) { - const auto prefetchMat = parameter->getPrefetchMatrix(); + auto prefetchMat = std::dynamic_pointer_cast( + parameter->getMat(PARAMETER_VALUE)); CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr"; auto sendMat = dynamic_cast( parameter->getMat(parameterType).get()); diff --git a/paddle/pserver/ParameterServer2.cpp b/paddle/pserver/ParameterServer2.cpp index 19ff40ba7e9584f772043f939bcb31caf666163d..41ac15336d3150417da1cf1631319604584991ec 100644 --- a/paddle/pserver/ParameterServer2.cpp +++ b/paddle/pserver/ParameterServer2.cpp @@ -18,7 +18,6 @@ limitations under the License. */ #include #include "paddle/math/SIMDFunctions.h" - #include "paddle/parameter/AverageOptimizer.h" #include "paddle/parameter/FirstOrderOptimizer.h" #include "paddle/parameter/OptimizerFunctions.h" @@ -26,6 +25,7 @@ limitations under the License. */ #include "paddle/parameter/ParameterOptimizer.h" #include "paddle/parameter/ParameterUpdateFunctions.h" #include "paddle/parameter/Regularizer.h" +#include "paddle/parameter/ThreadLocalBuffer.h" #include "paddle/utils/Flags.h" #include "paddle/utils/GlobalConstants.h" #include "paddle/utils/Stat.h" @@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, bool commitGradient = asyncGrdientCommitCheckAndStat(request); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); size_t bufferIndex = 0; for (const auto& block : request.blocks()) { int64_t offset = getBlockOffset(block); @@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) { } void ParameterServer2::parallelExecForEachBlock(ExecFunc func) { - SyncThreadPool::execHelper(syncThreadPool_.get(), - [&](int tid, size_t numThreads) { - int64_t numBlocks = blockIdMap_.size(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); - for (int64_t blockId = tid; blockId < numBlocks; - blockId += numThreads) { - func(blockId, vecs); - } - }); + SyncThreadPool::execHelper( + syncThreadPool_.get(), [&](int tid, size_t numThreads) { + int64_t numBlocks = blockIdMap_.size(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); + for (int64_t blockId = tid; blockId < numBlocks; + blockId += numThreads) { + func(blockId, vecs); + } + }); } void ParameterServer2::blockTraverse( diff --git a/paddle/py_paddle/dataprovider_converter.py b/paddle/py_paddle/dataprovider_converter.py index 7c6b83541002071d6e9d00c17be97b6ce4bf8528..edc2e0292378fea0cd904d7f017762c1dade6caf 100644 --- a/paddle/py_paddle/dataprovider_converter.py +++ b/paddle/py_paddle/dataprovider_converter.py @@ -17,6 +17,7 @@ import collections import swig_paddle import numpy import itertools +from functools import reduce __all__ = ['DataProviderConverter'] @@ -65,6 +66,8 @@ class IScanner(object): :param argument: Output arguments object. :type argument: swig_paddle.Arguments + :param dat: Output arguments object. + :type dat: The Python object, numpy.array or List. :return: """ pass @@ -95,17 +98,35 @@ class DenseScanner(IScanner): def __init__(self, input_type, pos): IScanner.__init__(self, input_type, pos) self.__mat__ = None + self.__shape__ = None self.__height__ = 0 + self.__dim__ = 0 def pre_scan(self, dat): self.__height__ += 1 + if self.__shape__ is None: + self.__shape__ = numpy.array(dat).shape + if len(self.__shape__) > 3: + raise ValueError( + "The dimension of input cannot be greater than 3.") + self.__dim__ = reduce(lambda x, y: x * y, self.__shape__) + if len(self.__shape__) == 1 and self.__dim__ != self.input_type.dim: + raise ValueError( + "The data size must be equal to it in data layer.") + else: + if self.__shape__ != numpy.array(dat).shape: + raise ValueError( + "The data shape must be same in one mini-batch.") def finish_pre_scan(self, argument): self.__mat__ = numpy.ndarray( - shape=(self.__height__, self.input_type.dim), dtype=numpy.float32) + shape=(self.__height__, self.__dim__), dtype=numpy.float32) self.__height__ = 0 def scan(self, dat): + # It's better to use NumPy array for speed. + dat = numpy.array(dat) + dat = dat.flatten() self.__mat__[self.__height__] = dat self.__height__ += 1 @@ -116,6 +137,14 @@ class DenseScanner(IScanner): m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, self.data_in_gpu) argument.setSlotValue(self.pos, m) + if len(self.__shape__) > 1: + # The last-two dimenstions are the frame height and width. + # For example, the layout is CHW for 3-D feature of image. + # The H and W are the fram height and width. + h, w = self.__shape__[-2:] + argument.setSlotFrameHeight(self.pos, h) + argument.setSlotFrameWidth(self.pos, w) + self.__shape__ = None class SparseBinaryScanner(IScanner): diff --git a/paddle/trainer/RemoteParameterUpdater.cpp b/paddle/trainer/RemoteParameterUpdater.cpp index 6939738203f41e0c1f7204d54834e34b2cd90682..7314266cb24da9b9e9f0f1cbe61ed363247f51fe 100644 --- a/paddle/trainer/RemoteParameterUpdater.cpp +++ b/paddle/trainer/RemoteParameterUpdater.cpp @@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize, bool apply) { ParameterType sendBackParameterType = (useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE; + std::function getParams; + std::function applyL1; if (fullSize) { - parameterClient_->getParameter( - /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); - if (config_.shrink_parameter_value() > 0) { - for (auto& para : parameters_) { - if (para->getConfig().decay_rate_l1() > 0) { - para->getBuf(PARAMETER_VALUE) - ->applyL1(1.0f, // learningRate - config_.shrink_parameter_value()); // decayRate - } - } - } + getParams = [&] { + parameterClient_->getParameter( + /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + }; + applyL1 = [](Parameter& para, real decayRate) { + para.getBuf(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate); + }; } else { - REGISTER_TIMER("getParamSparse"); - parameterClient_->getParameterSparse( - /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + getParams = [&] { + parameterClient_->getParameterSparse( + /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + }; + applyL1 = [](Parameter& para, real decayRate) { + para.getMat(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate); + }; + } + { + REGISTER_TIMER("getParamDenseAndSparse"); + getParams(); if (config_.shrink_parameter_value() > 0) { for (auto& para : parameters_) { if (para->getConfig().decay_rate_l1() > 0) { - para->getPrefetchMatrix()->applyL1Decay( - 1.0f, // learningRate - config_.shrink_parameter_value()); // decayRate + applyL1(*para, config_.shrink_parameter_value()); } } } diff --git a/paddle/trainer/ThreadParameterUpdater.cpp b/paddle/trainer/ThreadParameterUpdater.cpp index 870d4a4b0246fe244bbd3796ec14449eb181aad2..3c85c3aaac68fc29da90c24d1208887a17009d5f 100644 --- a/paddle/trainer/ThreadParameterUpdater.cpp +++ b/paddle/trainer/ThreadParameterUpdater.cpp @@ -17,6 +17,7 @@ limitations under the License. */ #include "paddle/utils/Logging.h" #include "paddle/math/SparseRowMatrix.h" +#include "paddle/parameter/ThreadLocalBuffer.h" #include "paddle/utils/Thread.h" DECLARE_int32(trainer_count); @@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse( int tid, size_t numThreads, Parameter* para) { - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); if (para->isGradSparseUpdate()) { size_t height = para->getConfig().dims(0); size_t width = para->getConfig().dims(1); @@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid, Parameter* para) { int pid = para->getID(); ParameterOptimizer* optimizer = optimizers_[pid].get(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); size_t height = para->getConfig().dims(0); size_t width = para->getConfig().dims(1); @@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid, Parameter* para) { int pid = para->getID(); ParameterOptimizer* optimizer = optimizers_[pid].get(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); auto interval = calcSplitArrayInterval( para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/); diff --git a/paddle/utils/arch/linux/Locks.cpp b/paddle/utils/arch/linux/Locks.cpp index 310c9a6542563891d4ba5888e58406ea28d6a2ce..3a0903d1f268cf0132da3de43396391219edf004 100644 --- a/paddle/utils/arch/linux/Locks.cpp +++ b/paddle/utils/arch/linux/Locks.cpp @@ -55,8 +55,11 @@ public: }; #else - +// clang-format off +#include #include +// clang-format on + class SpinLockPrivate { public: inline void lock() { diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 4f52f0f6cfd7f3f3de150c8c874f044a427f7b98..3640dd3a75ea212a84255ea7f6369b63606482ab 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -24,15 +24,16 @@ add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) set(PADDLE_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/) -add_subdirectory(paddle/trainer_config_helpers/tests) -if (WITH_SWIG_PY) - # enable v2 API unittest only when paddle swig api is compiled - add_subdirectory(paddle/v2/tests) - add_subdirectory(paddle/v2/reader/tests) - add_subdirectory(paddle/v2/plot/tests) +if (WITH_TESTING) + add_subdirectory(paddle/trainer_config_helpers/tests) + if (WITH_SWIG_PY) + # enable v2 API unittest only when paddle swig api is compiled + add_subdirectory(paddle/v2/tests) + add_subdirectory(paddle/v2/reader/tests) + add_subdirectory(paddle/v2/plot/tests) + endif() endif() - install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR} DESTINATION opt/paddle/share/wheels ) diff --git a/python/paddle/trainer/PyDataProvider2.py b/python/paddle/trainer/PyDataProvider2.py index a36f0ebfdcb9f90f54ba2d688f9f4bcee2939ef3..7e305e2cd9fbe306368a44d08f7f66b4185ae2d2 100644 --- a/python/paddle/trainer/PyDataProvider2.py +++ b/python/paddle/trainer/PyDataProvider2.py @@ -72,9 +72,16 @@ class InputType(object): def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE): """ - Dense Vector. It means the input feature is dense float vector. For example, - if the input is an image with 28*28 pixels, the input of Paddle neural - network should be a dense vector with dimension 784. + Dense Array. It means the input feature is dense array with float type. + For example, if the input is an image with 28*28 pixels, the input of + Paddle neural network could be a dense vector with dimension 784 or a + numpy array with shape (28, 28). + + For the 2-D convolution operation, each sample in one mini-batch must have + the similarly size in PaddlePaddle now. But, it supports variable-dimension + feature across mini-batch. For the variable-dimension, the param dim is not + used. While the data reader must yield numpy array and the data feeder will + set the data shape correctly. :param dim: dimension of this vector. :type dim: int @@ -135,6 +142,10 @@ sparse_binary_vector = sparse_non_value_slot sparse_vector = sparse_value_slot integer_value = index_slot +# dense_array can be used for variable-length input feature. +# Each feature is not a vector, but a multi-dimensional array. +dense_array = dense_slot + def dense_vector_sequence(dim): """ diff --git a/python/paddle/trainer_config_helpers/attrs.py b/python/paddle/trainer_config_helpers/attrs.py index 7ae9e5cb3050fa6f70fa84785a1ddbdc68c70235..d1167a234caed3753c6beedfc89b01054e3688e1 100644 --- a/python/paddle/trainer_config_helpers/attrs.py +++ b/python/paddle/trainer_config_helpers/attrs.py @@ -110,15 +110,16 @@ class ParameterAttribute(object): momentum=None, gradient_clipping_threshold=None, sparse_update=False): - # initialize strategy. + self.attr = {} + if is_static: - self.attr = {'is_static': True} - elif initial_std is None and initial_mean is None and initial_max \ + self.attr['is_static'] = True + + if initial_std is None and initial_mean is None and initial_max \ is None and initial_min is None: - self.attr = {'initial_smart': True} + self.attr['initial_smart'] = True elif is_compatible_with(initial_std, float) or \ is_compatible_with(initial_mean, float): - self.attr = dict() if initial_std is not None: self.attr['initial_std'] = initial_std if initial_mean is not None: @@ -131,7 +132,6 @@ class ParameterAttribute(object): assert initial_min < initial_max initial_mean = (initial_max + initial_min) / 2 initial_std = initial_mean - initial_min - self.attr = dict() self.attr['initial_mean'] = initial_mean self.attr['initial_std'] = initial_std self.attr['initial_strategy'] = 1 # Uniform Random diff --git a/python/paddle/trainer_config_helpers/config_parser.py b/python/paddle/trainer_config_helpers/config_parser.py deleted file mode 100644 index 4b91b8d2824cd89ac0d6da696492bd9289b6e5f4..0000000000000000000000000000000000000000 --- a/python/paddle/trainer_config_helpers/config_parser.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.trainer.config_parser as config_parser -''' -This file is a wrapper of formal config_parser. The main idea of this file is to -separete different config logic into different function, such as network configuration - and optimizer configuration. -''' - -__all__ = [ - "parse_trainer_config", "parse_network_config", "parse_optimizer_config" -] - - -def parse_trainer_config(trainer_conf, config_arg_str): - return config_parser.parse_config(trainer_conf, config_arg_str) - - -def parse_network_config(network_conf): - config = config_parser.parse_config(network_conf, '') - return config.model_config - - -def parse_optimizer_config(optimizer_conf): - config = config_parser.parse_config(optimizer_conf, '') - return config.opt_config diff --git a/python/paddle/v2/data_type.py b/python/paddle/v2/data_type.py index d582f76ddf01ed3430a1d075624bbb8e0bf3f2a9..226997465f2ec97c6224b248427739592e9694df 100644 --- a/python/paddle/v2/data_type.py +++ b/python/paddle/v2/data_type.py @@ -16,7 +16,8 @@ import paddle.trainer.PyDataProvider2 as pydp2 import_list = [ nm for nm in dir(pydp2) - if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm) + if '_' in nm and nm[0] != '_' and ('value' in nm or 'vector' in nm or + 'array' in nm) ] import_list.extend(['InputType']) diff --git a/python/paddle/v2/layer.py b/python/paddle/v2/layer.py index 3d9caeec5897fcd5b9e084aff496d150efee2066..919c531d184b0a95ce8b456d57465b90eee5003e 100644 --- a/python/paddle/v2/layer.py +++ b/python/paddle/v2/layer.py @@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__ class RecurrentLayerInput(Layer): - def __init__(self, recurrent_name, index, parent_layers): + def __init__(self, recurrent_name, index, parent_layers, reverse): parents_len = len(parent_layers) assert parents_len <= 1 if parents_len == 0: @@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer): else: self.__parents__ = parent_layers.values()[0] self.__recurrent_name__ = recurrent_name + self.__reverse__ = reverse name = self.__parents__[ index].name if index >= 0 else self.context_name() super(RecurrentLayerInput, self).__init__( @@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer): model_type('recurrent_nn') RecurrentLayerGroupWithoutOutLinksBegin( name=self.__recurrent_name__, - in_links=map(lambda x: x.name, self.__parents__)) + in_links=map(lambda x: x.name, self.__parents__), + seq_reversed=self.__reverse__) return self @@ -461,7 +463,7 @@ del each_layer_name @wrap_name_default() -def recurrent_group(step, input, name=None): +def recurrent_group(step, input, reverse=False, name=None): if not isinstance(input, collections.Sequence): input = [input] @@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None): RecurrentLayerInput( recurrent_name=name, index=i, - parent_layers={'recurrent_inputs': non_static_inputs}) - for i in xrange(len(non_static_inputs)) + parent_layers={'recurrent_inputs': non_static_inputs}, + reverse=reverse) for i in xrange(len(non_static_inputs)) ] extra_input = None if len(non_static_inputs) == 0: extra_input = RecurrentLayerInput( - recurrent_name=name, index=-1, parent_layers={}) + recurrent_name=name, index=-1, parent_layers={}, reverse=reverse) def __real_step__(*args): rnn_input = list(args) diff --git a/python/paddle/v2/tests/test_data_feeder.py b/python/paddle/v2/tests/test_data_feeder.py index 71eb3bf31425c22b47accc11c9550042e077ef12..83da678da387ed1c86868847f140c6c09fbec3b5 100644 --- a/python/paddle/v2/tests/test_data_feeder.py +++ b/python/paddle/v2/tests/test_data_feeder.py @@ -233,6 +233,30 @@ class DataFeederTest(unittest.TestCase): self.assertEqual(out_sparse.getSparseRowCols(i), data[i][1]) self.assertEqual(out_index[i], data[i][0]) + def test_dense_set_shape(self): + # test 2-D data + def gen_data(batch_size, shape): + data = [] + for i in xrange(batch_size): + each_sample = [] + each_sample.append(np.random.random(shape)) + data.append(each_sample) + return data + + feeder = DataFeeder([('image', data_type.dense_array(2352))], + {'image': 0}) + arg = feeder(gen_data(32, (3, 28, 28))) + h = arg.getSlotFrameHeight(0) + w = arg.getSlotFrameWidth(0) + self.assertEqual(h, 28) + self.assertEqual(w, 28) + + arg = feeder(gen_data(32, (3, 30, 32))) + h = arg.getSlotFrameHeight(0) + w = arg.getSlotFrameWidth(0) + self.assertEqual(h, 30) + self.assertEqual(w, 32) + if __name__ == '__main__': api.initPaddle("--use_gpu=0") diff --git a/python/paddle/v2/tests/test_rnn_layer.py b/python/paddle/v2/tests/test_rnn_layer.py index 5fbbd20eb76bb9daab2bcf98c4adad989106a377..845277c01288f99f75a148ddab5895d00864f60c 100644 --- a/python/paddle/v2/tests/test_rnn_layer.py +++ b/python/paddle/v2/tests/test_rnn_layer.py @@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase): def test(): data = conf_helps.data_layer(name="word", size=dict_dim) embd = conf_helps.embedding_layer(input=data, size=word_dim) - conf_helps.recurrent_group(name="rnn", step=step, input=embd) + conf_helps.recurrent_group( + name="rnn", step=step, input=embd, reverse=True) return str(parse_network(test)) @@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase): name="word", type=data_type.integer_value(dict_dim)) embd = layer.embedding(input=data, size=word_dim) rnn_layer = layer.recurrent_group( - name="rnn", step=new_step, input=embd) + name="rnn", step=new_step, input=embd, reverse=True) return str(layer.parse_network(rnn_layer)) diff = difflib.unified_diff(parse_old_rnn().splitlines(1),