diff --git a/cmake/util.cmake b/cmake/util.cmake index b828eef322bc570c07f5c357353641117a094c16..8c9143462227e7081142f6be250b1a45e4b6d51b 100644 --- a/cmake/util.cmake +++ b/cmake/util.cmake @@ -149,8 +149,9 @@ endfunction() # Create a python unittest using run_python_tests.sh, # which takes care of making correct running environment function(add_python_test TEST_NAME) - add_test(NAME ${TEST_NAME} - COMMAND bash ${PROJ_ROOT}/paddle/scripts/run_python_tests.sh + add_test(NAME ${TEST_NAME} + COMMAND env PADDLE_PACKAGE_DIR=${PADDLE_PYTHON_PACKAGE_DIR} + bash ${PROJ_ROOT}/paddle/scripts/run_python_tests.sh ${USE_VIRTUALENV_FOR_TEST} ${PYTHON_EXECUTABLE} ${ARGN} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) endfunction() diff --git a/doc/design/cluster_train/master_server.md b/doc/design/cluster_train/master_server.md index bb8307652587b4dc56cd668a3a5e64722734d194..4bf3c506f101361875043f8bfd97972b8c981a22 100644 --- a/doc/design/cluster_train/master_server.md +++ b/doc/design/cluster_train/master_server.md @@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c ## Task Queue -As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress. +As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *chunks* from one or multiple files. The master server maintains *task queues* to track the training progress. ### Task Queue Creation @@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error { } ``` -1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file. +1. The master server will scan through each RecordIO file to generate the *chunk index* and know how many chunks does each file have. A chunk can be referenced by the file path and the index of the chunk within the file. The chunk index is in memory data structure that enables fast access to each chunk, and the index of the chunk with the file is an integer start from 0, representing the n-th chunk within the file. - The definition of the block is: + The definition of the chunk is: ```go - type Block struct { - Idx int // index of the block within the file + type Chunk struct { + Idx int // index of the chunk within the file Path string - Index recordio.Index // block index + Index recordio.Index // chunk index } ``` -1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. +1. Chunks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. The definition of the task is: ```go type Task struct { Index int - Blocks []Block + Chunks []Chunk } ``` diff --git a/doc/design/cluster_train/pserver_client.md b/doc/design/cluster_train/pserver_client.md index 392bab25e9de6bf5aa7cc1b0ad345ef12f1d9e5d..007285640e9f11c55715291774826620419cec66 100644 --- a/doc/design/cluster_train/pserver_client.md +++ b/doc/design/cluster_train/pserver_client.md @@ -55,7 +55,7 @@ The trainer select process is encapsulated in the C API function: ```c int paddle_begin_init_params(paddle_pserver_client* client, const char* config_proto); ``` -The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will block until initialization is done, and return 0. As illustrated below: +The selected trainer's call to `paddle_begin_init_params` will return with 1, and the other trainers' call to `paddle_begin_init_params` will return 0. `paddle_get_params` will be blocked until initialization is completed. As illustrated below: @@ -89,16 +89,13 @@ void paddle_pserver_client_release(paddle_pserver_client* client); * * paddle_begin_init_params will be called from multiple trainers, * only one trainer will be selected to initialize the parameters on - * parameter servers. Other trainers will be blocked until the - * initialization is done, and they need to get the initialized + * parameter servers. Other trainers need to get the initialized * parameters from parameter servers using @paddle_get_params. * - * @param pserver_config_proto serialized parameter server configuration in - * Protocol Buffers format. * @return 1 if the trainer is selected to initialize parameter * servers, otherwise 0. */ -int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_config_proto); +int paddle_begin_init_params(paddle_pserver_client* client); /** * @brief paddle_init_param initializes the parameter on parameter @@ -106,12 +103,13 @@ int paddle_begin_init_params(paddle_pserver_client* client, const char* pserver_ * * @param param the parameter to initialize. * @param param_config_proto the configuration for the parameter. + * @param config_len the length of param_config_proto * @return 0 if successful, otherwise -1. On failure, the trainer * needs to restart the entire initialization process (starting from * @paddle_begin_init_param). Or simply exit the program and wait for * the cluster management system to restart the trainer. */ -int paddle_init_param(paddle_pserver_client* client, paddle_parameter params, const char* param_config_proto); +int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len); /** * @brief paddle_finish_init_params tells parameter servers client has diff --git a/doc/design/cluster_train/src/pserver_init.graffle b/doc/design/cluster_train/src/pserver_init.graffle index 730d3a561ffdc19e723b3cf6612471440951826a..5f3f1f52be8aa7f9049a8fcd6b7c93c8560c1676 100644 Binary files a/doc/design/cluster_train/src/pserver_init.graffle and b/doc/design/cluster_train/src/pserver_init.graffle differ diff --git a/doc/design/cluster_train/src/pserver_init.png b/doc/design/cluster_train/src/pserver_init.png index 4d502226d82ba271c50ae1bec5efaaaac4cc4434..dfe491ff98dd7db1c336093c80964a260df2cd90 100644 Binary files a/doc/design/cluster_train/src/pserver_init.png and b/doc/design/cluster_train/src/pserver_init.png differ diff --git a/doc/design/parameters_in_cpp.md b/doc/design/parameters_in_cpp.md new file mode 100644 index 0000000000000000000000000000000000000000..b6f99bc7d9d6fafacb0a4bcff806b65d9aef98cc --- /dev/null +++ b/doc/design/parameters_in_cpp.md @@ -0,0 +1,41 @@ +# Design Doc: The C++ Class `Parameters` + +`Parameters` is a concept we designed in Paddle V2 API. `Parameters` is a container of parameters, and make Paddle can shared parameter between topologies. We described usages of `Parameter` in [api.md](./api.md). + +We used Python to implement Parameters when designing V2 API before. There are several defects for current implementation: +* We just use `memcpy` to share Parameters between topologies, but this is very inefficient. +* We did not implement share Parameters while training. We just trigger `memcpy` when start training. + +It is necessary that we implement Parameters in CPP side. However, it could be a code refactoring for Paddle, because Paddle was designed for training only one topology before, i.e., each GradientMachine contains its Parameter as a data member. In current Paddle implementation, there are three concepts associated with `Parameters`: + +1. `paddle::Parameter`. A `Parameters` is a container for `paddle::Parameter`. +It is evident that we should use `paddle::Parameter` when developing `Parameters`. +However, the `Parameter` class contains many functions and does not have a clear interface. +It contains `create/store Parameter`, `serialize/deserialize`, `optimize(i.e SGD)`, `randomize/zero`. +When we developing `Parameters`, we only use `create/store Parameter` functionality. +We should extract functionalities of Parameter into many classes to clean Paddle CPP implementation. + +2. `paddle::GradientMachine` and its sub-classes, e.g., `paddle::MultiGradientMachine`, `paddle::NeuralNetwork`. +We should pass `Parameters` to `paddle::GradientMachine` when `forward/backward` to avoid `memcpy` between topologies. +Also, we should handle multi-GPU/CPU training, because `forward` and `backward` would perform on multi-GPUs and multi-CPUs. +`Parameters` should dispatch the parameter value to each device, and gather the parameter gradient from each device. + +3. `paddle::ParameterUpdater`. The ParameterUpdater is used to update parameters in Paddle. +So `Parameters` should be used by `paddle::ParameterUpdater`, and `paddle::ParameterUpdater` should optimize `Parameters` (by SGD). + + +The step by step approach for implementation Parameters in Paddle C++ core is listed below. Each step should be a PR and could be merged into Paddle one by one. + +1. Clean `paddle::Parameter` interface. Extract the functionalities of `paddle::Parameter` to prepare for the implementation of Parameters. + +2. Implementation a `Parameters` class. It just stores the `paddle::Parameter` inside. Make `GradientMachine` uses `Parameters` as a class member. + +3. Make `Parameters` support Multi-CPU and Multi-GPU training to prepare for sharing `Parameter` between topologies. +Because we need share `Parameters` between topologies, it is `Parameters`'s response to exchange Parameters between GPUs. +`GradientMachine` should not handle how to exchange Parameters because `GradientMachine` only used to train one topology and we need to support train many topologies in Paddle, i.e., there could be many GradientMachines use one `Parameters`. + * We should use a global function to exchange Parameters between GPUs, not a member function in `Parameters`. The `MultiGradientMachine` invoke this function, which uses `Parameters` as this function inputs. + * The MultiGradientMachine contains many functionalities. Extracting the Parameters exchanging logic could make MultiGradientMachine clearer and simpler. + +4. Make `Parameters` as an argument for `forward/backward` function, not a data member for `GradientMachine`. For example, `forward` could be `forward(const Parameters& params, ...)` and `backward` could be `backward(Parameters* params, ...)`. After this step, Paddle could share `Parameters` between topologies. + +5. `ParameterUpdater` is invoked by `GradientMachine` and `Trainer`, but it updates `Parameters`. In the end of this code refactoring, we could change `ParameterUpdater` directly uses `Parameters` to make `ParameterUpdater`'s implementation clear. diff --git a/doc/design/speech/README.MD b/doc/design/speech/README.MD new file mode 100644 index 0000000000000000000000000000000000000000..7304650e628dba210488cd2dc4836318b5383b2a --- /dev/null +++ b/doc/design/speech/README.MD @@ -0,0 +1,155 @@ +# DeepSpeech2 on PaddlePaddle: Design Doc + +We are planning to build Deep Speech 2 (DS2) \[[1](#references)\], a powerful Automatic Speech Recognition (ASR) engine, on PaddlePaddle. For the first-stage plan, we have the following short-term goals: + +- Release a basic distributed implementation of DS2 on PaddlePaddle. +- Contribute a chapter of Deep Speech to PaddlePaddle Book. + +Intensive system optimization and low-latency inference library (details in \[[1](#references)\]) are not yet covered in this first-stage plan. + +## Table of Contents + +- [Tasks](#tasks) +- [Task Dependency](#task-dependency) +- [Design Details](#design-details) + - [Overview](#overview) + - [Row Convolution](#row-convolution) + - [Beam Search With CTC and LM](#beam-search-with-ctc-and-lm) +- [Future Work](#future-work) +- [References](#references) + +## Tasks + +We roughly break down the project into 14 tasks: + +1. Develop an **audio data provider**: + - Json filelist generator. + - Audio file format transformer. + - Spectrogram feature extraction, power normalization etc. + - Batch data reader with SortaGrad. + - Data augmentation (optional). + - Prepare (one or more) public English data sets & baseline. +2. Create a **simplified DS2 model configuration**: + - With only fixed-length (by padding) audio sequences (otherwise need *Task 3*). + - With only bidirectional-GRU (otherwise need *Task 4*). + - With only greedy decoder (otherwise need *Task 5, 6*). +3. Develop to support **variable-shaped** dense-vector (image) batches of input data. + - Update `DenseScanner` in `dataprovider_converter.py`, etc. +4. Develop a new **lookahead-row-convolution layer** (See \[[1](#references)\] for details): + - Lookahead convolution windows. + - Within-row convolution, without kernels shared across rows. +5. Build KenLM **language model** (5-gram) for beam search decoder: + - Use KenLM toolkit. + - Prepare the corpus & train the model. + - Create infererence interfaces (for Task 6). +6. Develop a **beam search decoder** with CTC + LM + WORDCOUNT: + - Beam search with CTC. + - Beam search with external custom scorer (e.g. LM). + - Try to design a more general beam search interface. +7. Develop a **Word Error Rate evaluator**: + - update `ctc_error_evaluator`(CER) to support WER. +8. Prepare internal dataset for Mandarin (optional): + - Dataset, baseline, evaluation details. + - Particular data preprocessing for Mandarin. + - Might need cooperating with the Speech Department. +9. Create **standard DS2 model configuration**: + - With variable-length audio sequences (need *Task 3*). + - With unidirectional-GRU + row-convolution (need *Task 4*). + - With CTC-LM beam search decoder (need *Task 5, 6*). +10. Make it run perfectly on **clusters**. +11. Experiments and **benchmarking** (for accuracy, not efficiency): + - With public English dataset. + - With internal (Baidu) Mandarin dataset (optional). +12. Time **profiling** and optimization. +13. Prepare **docs**. +14. Prepare PaddlePaddle **Book** chapter with a simplified version. + +## Task Dependency + +Tasks parallelizable within phases: + +Roadmap | Description | Parallelizable Tasks +----------- | :------------------------------------ | :-------------------- +Phase I | Simplified model & components | *Task 1* ~ *Task 8* +Phase II | Standard model & benchmarking & profiling | *Task 9* ~ *Task 12* +Phase III | Documentations | *Task13* ~ *Task14* + +Issue for each task will be created later. Contributions, discussions and comments are all highly appreciated and welcomed! + +## Design Details + +### Overview + +Traditional **ASR** (Automatic Speech Recognition) pipelines require great human efforts devoted to elaborately tuning multiple hand-engineered components (e.g. audio feature design, accoustic model, pronuncation model and language model etc.). **Deep Speech 2** (**DS2**) \[[1](#references)\], however, trains such ASR models in an end-to-end manner, replacing most intermediate modules with only a single deep network architecture. With scaling up both the data and model sizes, DS2 achieves a very significant performance boost. + +Please read Deep Speech 2 \[[1](#references),[2](#references)\] paper for more background knowledge. + +The classical DS2 network contains 15 layers (from bottom to top): + +- **Two** data layers (audio spectrogram, transcription text) +- **Three** 2D convolution layers +- **Seven** uni-directional simple-RNN layers +- **One** lookahead row convolution layers +- **One** fully-connected layers +- **One** CTC-loss layer + +
+
+Figure 1. Archetecture of Deep Speech 2 Network. +
+ +We don't have to persist on this 2-3-7-1-1-1 depth \[[2](#references)\]. Similar networks with different depths might also work well. As in \[[1](#references)\], authors use a different depth (e.g. 2-2-3-1-1-1) for final experiments. + +Key ingredients about the layers: + +- **Data Layers**: + - Frame sequences data of audio **spectrogram** (with FFT). + - Token sequences data of **transcription** text (labels). + - These two type of sequences do not have the same lengthes, thus a CTC-loss layer is required. +- **2D Convolution Layers**: + - Not only temporal convolution, but also **frequency convolution**. Like a 2D image convolution, but with a variable dimension (i.e. temporal dimension). + - With striding for only the first convlution layer. + - No pooling for all convolution layers. +- **Uni-directional RNNs** + - Uni-directional + row convolution: for low-latency inference. + - Bi-direcitional + without row convolution: if we don't care about the inference latency. +- **Row convolution**: + - For looking only a few steps ahead into the feature, instead of looking into a whole sequence in bi-directional RNNs. + - Not nessesary if with bi-direcitional RNNs. + - "**Row**" means convolutions are done within each frequency dimension (row), and no convolution kernels shared across. +- **Batch Normalization Layers**: + - Added to all above layers (except for data and loss layer). + - Sequence-wise normalization for RNNs: BatchNorm only performed on input-state projection and not state-state projection, for efficiency consideration. + + +Required Components | PaddlePaddle Support | Need to Develop +:------------------------------------- | :-------------------------------------- | :----------------------- +Data Layer I (Spectrogram) | Not supported yet. | TBD (Task 3) +Data Layer II (Transcription) | `paddle.data_type.integer_value_sequence` | - +2D Convolution Layer | `paddle.layer.image_conv_layer` | - +DataType Converter (vec2seq) | `paddle.layer.block_expand` | - +Bi-/Uni-directional RNNs | `paddle.layer.recurrent_group` | - +Row Convolution Layer | Not supported yet. | TBD (Task 4) +CTC-loss Layer | `paddle.layer.warp_ctc` | - +Batch Normalization Layer | `paddle.layer.batch_norm` | - +CTC-Beam search | Not supported yet. | TBD (Task 6) + +### Row Convolution + +TODO by Assignees + +### Beam Search with CTC and LM + +TODO by Assignees + +## Future Work + +- Efficiency Improvement +- Accuracy Improvement +- Low-latency Inference Library +- Large-scale benchmarking + +## References + +1. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](http://proceedings.mlr.press/v48/amodei16.pdf). ICML 2016. +2. Dario Amodei, etc., [Deep Speech 2 : End-to-End Speech Recognition in English and Mandarin](https://arxiv.org/abs/1512.02595). arXiv:1512.02595. diff --git a/doc/design/speech/image/ds2_network.png b/doc/design/speech/image/ds2_network.png new file mode 100644 index 0000000000000000000000000000000000000000..1a5b2184d47928cc2849d5a7c8ea2d8cf5337e11 Binary files /dev/null and b/doc/design/speech/image/ds2_network.png differ diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go new file mode 100644 index 0000000000000000000000000000000000000000..ef1f87c2dd53b701810c82ae90eaf3f94ea15e47 --- /dev/null +++ b/paddle/go/cmd/master/master.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "net" + "net/http" + "net/rpc" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/namsral/flag" + + "github.com/PaddlePaddle/Paddle/paddle/go/master" + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +func main() { + port := flag.Int("port", 8080, "port of the master server.") + dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.") + faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).") + taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") + taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") + chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") + flag.Parse() + + if *dataset == "" { + panic("no dataset specified.") + } + + if *faultTolerance { + panic("fault tolernance not implemented.") + } + + var chunks []master.Chunk + var paths []string + ss := strings.Split(*dataset, ",") + fmt.Println(ss) + for _, s := range ss { + match, err := filepath.Glob(s) + if err != nil { + panic(err) + } + paths = append(paths, match...) + } + + if len(paths) == 0 { + panic("no valid datset specified.") + } + + idx := 0 + for _, path := range paths { + f, err := os.Open(path) + if err != nil { + panic(err) + } + + index, err := recordio.LoadIndex(f) + if err != nil { + panic(err) + } + f.Close() + + count := index.NumChunks() + for i := 0; i < count; i++ { + chunk := master.Chunk{ + Idx: idx, + Path: path, + Index: *index.ChunkIndex(i), + } + chunks = append(chunks, chunk) + } + } + + s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) + err := rpc.Register(s) + if err != nil { + panic(err) + } + + rpc.HandleHTTP() + l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + if err != nil { + panic(err) + } + + err = http.Serve(l, nil) + if err != nil { + panic(err) + } +} diff --git a/paddle/go/cmd/pserver/pserver.go b/paddle/go/cmd/pserver/pserver.go index 41417875fb98aca3f2181841b28f7b220e948618..bd4bfc7028302df1c3e6ecd3cc9ebb11b158df01 100644 --- a/paddle/go/cmd/pserver/pserver.go +++ b/paddle/go/cmd/pserver/pserver.go @@ -1,17 +1,18 @@ package main import ( - "flag" "net" "net/http" "net/rpc" "strconv" + "github.com/namsral/flag" + "github.com/PaddlePaddle/Paddle/paddle/go/pserver" ) func main() { - port := flag.Int("p", 0, "port of the pserver") + port := flag.Int("port", 0, "port of the pserver") flag.Parse() s := pserver.NewService() diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go new file mode 100644 index 0000000000000000000000000000000000000000..75266482870c448fcde7359640bc4773c200fecb --- /dev/null +++ b/paddle/go/master/service.go @@ -0,0 +1,178 @@ +package master + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +const ( + targetTaskCount = 300 +) + +// errors +var ( + ErrNoMoreTask = errors.New("no more task for current pass") + ErrPendingTaskNotFound = errors.New("pending task not found") +) + +// Service is the master server service. +type Service struct { + timeoutDur time.Duration + timeoutMax int + + mu sync.Mutex + taskQueues taskQueues +} + +// Recover recovers service state from etcd. +func Recover() (*Service, error) { + // TODO(helin): recover from snapshot state from etcd. + return nil, nil +} + +func partition(chunks []Chunk, chunksPerTask int) []taskEntry { + id := 0 + if chunksPerTask <= 0 { + chunksPerTask = 1 + } + + var result []taskEntry + var cur taskEntry + for i, c := range chunks { + if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + cur.Task.Chunks = nil + } + + cur.Task.Chunks = append(cur.Task.Chunks, c) + } + + if len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + } + + return result +} + +// NewService creates a new service. +func NewService(chunks []Chunk, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service { + s := &Service{} + s.timeoutDur = timeoutDur + s.timeoutMax = timeoutMax + s.taskQueues = taskQueues{} + s.taskQueues.Pending = make(map[int]taskEntry) + s.taskQueues.Todo = partition(chunks, chunksPerTask) + return s +} + +// Chunk is a chunk of data consisted of several data instances. +type Chunk struct { + Idx int // index of the chunk within the file + Path string + Index recordio.Index // block index +} + +// Task is the basic unit of data instances assigned to trainers. +type Task struct { + ID int + Chunks []Chunk +} + +type taskEntry struct { + Epoch int + NumTimeout int + Task Task +} + +type taskQueues struct { + Todo []taskEntry + Pending map[int]taskEntry // map from task ID to task entry + Done []taskEntry + Failed []Task +} + +// *must* be called with s.mu being held. +func (s *Service) snapshot() error { + // TODO(helin): snapshot state on etcd. + return nil +} + +// GetTask gets a new task from the service. +func (s *Service) GetTask(dummy int, task *Task) error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.taskQueues.Todo) == 0 { + return ErrNoMoreTask + } + + t := s.taskQueues.Todo[0] + t.Epoch++ + s.taskQueues.Todo = s.taskQueues.Todo[1:] + s.taskQueues.Pending[t.Task.ID] = t + err := s.snapshot() + if err != nil { + return err + } + + time.AfterFunc(s.timeoutDur, func(taskID int, epoch int) func() { + return func() { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return + } + + if t.Epoch != epoch { + // new epoch, task launched after the + // schedule of this timeout check. + return + } + + defer func() { + err := s.snapshot() + if err != nil { + log.Println(err) + } + }() + + delete(s.taskQueues.Pending, t.Task.ID) + + t.NumTimeout++ + if t.NumTimeout > s.timeoutMax { + s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task) + return + } + + s.taskQueues.Todo = append(s.taskQueues.Todo, t) + } + }(t.Task.ID, t.Epoch)) + return nil +} + +// TaskFinished tell the service that a task is finished. +func (s *Service) TaskFinished(taskID int, dummy *int) error { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return ErrPendingTaskNotFound + } + + // task finished, reset timeout + t.NumTimeout = 0 + s.taskQueues.Done = append(s.taskQueues.Done, t) + delete(s.taskQueues.Pending, taskID) + return s.snapshot() +} diff --git a/paddle/go/master/service_internal_test.go b/paddle/go/master/service_internal_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bc435b505c014ca13ed5fc16b33a21ebb089a3b7 --- /dev/null +++ b/paddle/go/master/service_internal_test.go @@ -0,0 +1,37 @@ +package master + +import "testing" + +func TestPartitionCount(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 5) + if len(ts) != 20 { + t.Error(len(ts)) + } + + cs = make([]Chunk, 101) + ts = partition(cs, 5) + if len(ts) != 21 { + t.Error(len(ts)) + } + + ts = partition(cs, 1) + if len(ts) != 101 { + t.Error(len(ts)) + } + + ts = partition(cs, 0) + if len(ts) != 101 { + t.Error(len(ts)) + } +} + +func TestPartionIndex(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 20) + for i := range ts { + if ts[i].Task.ID != i { + t.Error(ts[i], i) + } + } +} diff --git a/paddle/scripts/run_python_tests.sh b/paddle/scripts/run_python_tests.sh index c588b9e08def5510e5daea2fd4e18f66c5f89bb8..1ed497aaeccdb629181809a0cbc48abb57ae4c44 100755 --- a/paddle/scripts/run_python_tests.sh +++ b/paddle/scripts/run_python_tests.sh @@ -24,12 +24,21 @@ PYTHON=$1; shift if [ $USE_VIRTUALENV_FOR_TEST -ne 0 ]; then rm -rf .test_env virtualenv .test_env + unset PYTHONHOME + unset PYTHONPATH source .test_env/bin/activate PYTHON=python fi -export PYTHONPATH=$SCRIPTPATH/../../python/ -$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl requests matplotlib opencv-python ipython==5.3 rarfile +$PYTHON -m pip install $SCRIPTPATH/../dist/*.whl + +if [ "X${PADDLE_PACKAGE_DIR}" != "X" ]; then + $PYTHON -m pip install ${PADDLE_PACKAGE_DIR}/*.whl +else + export PYTHONPATH=$SCRIPTPATH/../../python/ +fi + +$PYTHON -m pip install ipython==5.3 for fn in "$@" do diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index bfa19d5ecc84a08614852c4c93de5b5793c1be9c..4f52f0f6cfd7f3f3de150c8c874f044a427f7b98 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -23,7 +23,9 @@ add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) +set(PADDLE_PYTHON_PACKAGE_DIR ${CMAKE_CURRENT_BINARY_DIR}/dist/) add_subdirectory(paddle/trainer_config_helpers/tests) + if (WITH_SWIG_PY) # enable v2 API unittest only when paddle swig api is compiled add_subdirectory(paddle/v2/tests) @@ -31,6 +33,6 @@ if (WITH_SWIG_PY) add_subdirectory(paddle/v2/plot/tests) endif() -install(DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dist/ +install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR} DESTINATION opt/paddle/share/wheels ) diff --git a/python/paddle/trainer_config_helpers/attrs.py b/python/paddle/trainer_config_helpers/attrs.py index 7ae9e5cb3050fa6f70fa84785a1ddbdc68c70235..d1167a234caed3753c6beedfc89b01054e3688e1 100644 --- a/python/paddle/trainer_config_helpers/attrs.py +++ b/python/paddle/trainer_config_helpers/attrs.py @@ -110,15 +110,16 @@ class ParameterAttribute(object): momentum=None, gradient_clipping_threshold=None, sparse_update=False): - # initialize strategy. + self.attr = {} + if is_static: - self.attr = {'is_static': True} - elif initial_std is None and initial_mean is None and initial_max \ + self.attr['is_static'] = True + + if initial_std is None and initial_mean is None and initial_max \ is None and initial_min is None: - self.attr = {'initial_smart': True} + self.attr['initial_smart'] = True elif is_compatible_with(initial_std, float) or \ is_compatible_with(initial_mean, float): - self.attr = dict() if initial_std is not None: self.attr['initial_std'] = initial_std if initial_mean is not None: @@ -131,7 +132,6 @@ class ParameterAttribute(object): assert initial_min < initial_max initial_mean = (initial_max + initial_min) / 2 initial_std = initial_mean - initial_min - self.attr = dict() self.attr['initial_mean'] = initial_mean self.attr['initial_std'] = initial_std self.attr['initial_strategy'] = 1 # Uniform Random diff --git a/python/paddle/v2/layer.py b/python/paddle/v2/layer.py index 3d9caeec5897fcd5b9e084aff496d150efee2066..919c531d184b0a95ce8b456d57465b90eee5003e 100644 --- a/python/paddle/v2/layer.py +++ b/python/paddle/v2/layer.py @@ -360,7 +360,7 @@ mixed.__doc__ = conf_helps.mixed_layer.__doc__ class RecurrentLayerInput(Layer): - def __init__(self, recurrent_name, index, parent_layers): + def __init__(self, recurrent_name, index, parent_layers, reverse): parents_len = len(parent_layers) assert parents_len <= 1 if parents_len == 0: @@ -368,6 +368,7 @@ class RecurrentLayerInput(Layer): else: self.__parents__ = parent_layers.values()[0] self.__recurrent_name__ = recurrent_name + self.__reverse__ = reverse name = self.__parents__[ index].name if index >= 0 else self.context_name() super(RecurrentLayerInput, self).__init__( @@ -380,7 +381,8 @@ class RecurrentLayerInput(Layer): model_type('recurrent_nn') RecurrentLayerGroupWithoutOutLinksBegin( name=self.__recurrent_name__, - in_links=map(lambda x: x.name, self.__parents__)) + in_links=map(lambda x: x.name, self.__parents__), + seq_reversed=self.__reverse__) return self @@ -461,7 +463,7 @@ del each_layer_name @wrap_name_default() -def recurrent_group(step, input, name=None): +def recurrent_group(step, input, reverse=False, name=None): if not isinstance(input, collections.Sequence): input = [input] @@ -471,14 +473,14 @@ def recurrent_group(step, input, name=None): RecurrentLayerInput( recurrent_name=name, index=i, - parent_layers={'recurrent_inputs': non_static_inputs}) - for i in xrange(len(non_static_inputs)) + parent_layers={'recurrent_inputs': non_static_inputs}, + reverse=reverse) for i in xrange(len(non_static_inputs)) ] extra_input = None if len(non_static_inputs) == 0: extra_input = RecurrentLayerInput( - recurrent_name=name, index=-1, parent_layers={}) + recurrent_name=name, index=-1, parent_layers={}, reverse=reverse) def __real_step__(*args): rnn_input = list(args) diff --git a/python/paddle/v2/plot/tests/CMakeLists.txt b/python/paddle/v2/plot/tests/CMakeLists.txt index b1132f131737e26bfeeb31f6b3f062710bdf6f75..da5cd764889b48a3af8461a2793d948aa609d6c1 100644 --- a/python/paddle/v2/plot/tests/CMakeLists.txt +++ b/python/paddle/v2/plot/tests/CMakeLists.txt @@ -1 +1,5 @@ -add_python_test(test_ploter test_ploter.py) +if (NOT APPLE) + # The Mac OS X backend will not be able to function correctly if Python is + # not installed as a framework. + add_python_test(test_ploter test_ploter.py) +endif() diff --git a/python/paddle/v2/tests/test_rnn_layer.py b/python/paddle/v2/tests/test_rnn_layer.py index 5fbbd20eb76bb9daab2bcf98c4adad989106a377..845277c01288f99f75a148ddab5895d00864f60c 100644 --- a/python/paddle/v2/tests/test_rnn_layer.py +++ b/python/paddle/v2/tests/test_rnn_layer.py @@ -42,7 +42,8 @@ class RNNTest(unittest.TestCase): def test(): data = conf_helps.data_layer(name="word", size=dict_dim) embd = conf_helps.embedding_layer(input=data, size=word_dim) - conf_helps.recurrent_group(name="rnn", step=step, input=embd) + conf_helps.recurrent_group( + name="rnn", step=step, input=embd, reverse=True) return str(parse_network(test)) @@ -60,7 +61,7 @@ class RNNTest(unittest.TestCase): name="word", type=data_type.integer_value(dict_dim)) embd = layer.embedding(input=data, size=word_dim) rnn_layer = layer.recurrent_group( - name="rnn", step=new_step, input=embd) + name="rnn", step=new_step, input=embd, reverse=True) return str(layer.parse_network(rnn_layer)) diff = difflib.unified_diff(parse_old_rnn().splitlines(1), diff --git a/python/setup.py.in b/python/setup.py.in index 1afaffd261702738cdd552636a26dfcfbfe5ac48..d1c38823080fb3a5c879d8b59cb5371c07902e57 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -19,6 +19,7 @@ setup(name='paddle', "numpy", "protobuf==${PROTOBUF_VERSION}", "matplotlib", + "opencv-python", "rarfile" ], packages=packages,