提交 17842e33 编写于 作者: C chengduoZH

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into...

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into feature/expose_CUDAPinnedPlace_to_python
......@@ -5,10 +5,10 @@ In a large scale machine learning setup where the size of the training data is h
Polyak and Juditsky (1992) showed that the test performance of simple average of parameters obtained by Stochastic Gradient Descent (SGD) is as good as that of parameter values that are obtained by training the model over and over again, over the training dataset.
Hence, to accelerate the speed of Stochastic Gradient Descent, Averaged Stochastic Gradient Descent (ASGD) was proposed in Polyak and Juditsky (1992). For ASGD, the running average of parameters obtained by SGD, is used as the estimator for <img src="./images/theta_star.gif"/><br/> . The averaging is done as follows:
Hence, to accelerate the speed of Stochastic Gradient Descent, Averaged Stochastic Gradient Descent (ASGD) was proposed in Polyak and Juditsky (1992). For ASGD, the running average of parameters obtained by SGD, is used as the estimator for <img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/theta_star.gif"/><br/> . The averaging is done as follows:
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/asgd.gif"><br />
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/asgd.gif"><br />
</p>
We propose averaging for any optimizer similar to how ASGD performs it, as mentioned above.
......
......@@ -114,13 +114,13 @@ current thread under two conditions:
#### Channel Send
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/channel_send.png"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/channel_send.png"/><br/>
</p>
#### Channel Receive
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/channel_recv.png"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/channel_recv.png"/><br/>
</p>
## Limitations and Considerations
......
......@@ -23,21 +23,25 @@ The following table compares concepts in Fluid and Go
<td>user-defined functions </td>
<td>
<a href="https://github.com/PaddlePaddle/Paddle/tree/develop/python/paddle/fluid">layers</a></td>
<td></td>
</tr>
<tr>
<td>control-flow and built-in functions </td>
<td>
<a href="https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/operators">intrinsics/operators</a></td>
<td></td>
</tr>
<tr>
<td>goroutines, channels </td>
<td>
<a href="https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/framework/thread_pool.h">class ThreadPool</a></td>
<td></td>
</tr>
<tr>
<td>runtime </td>
<td>
<a href="https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h">class Executor</a></td>
<td></td>
</tr>
</tbody>
</table>
......
......@@ -254,7 +254,7 @@ only one case will be executed.
### select_op flow
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/select_op_workflow.png"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/select_op_workflow.png"/><br/>
</p>
The select algorithm is inspired by golang's select routine. Please refer to
......
......@@ -40,11 +40,11 @@ computation is only specified in Python code which sits outside of PaddlePaddle,
Similar to how a compiler uses an intermediate representation (IR) so that the programmer does not need to manually optimize their code for most of the cases, we can have an intermediate representation in PaddlePaddle as well. The compiler optimizes the IR as follows:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/compiler.png"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/compiler.png"/>
PaddlePaddle can support model parallelism by converting the IR so that the user no longer needs to manually perform the computation and operations in the Python component:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/paddle-compile.png"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/paddle-compile.png"/>
The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the computation dependency graph and the variables used in the computation.
......@@ -60,7 +60,7 @@ For a detailed explanation, refer to this document -
The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/distributed_architecture.png"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/distributed_architecture.png"/>
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*.
......@@ -152,7 +152,7 @@ for data in train_reader():
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/remote_executor.png" width="500" align="center" />
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/remote_executor.png" width="500" align="center" />
`RemoteExecutor.run` sends the `ProgramDesc` and
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/unreleased-tpr/doc/autoscale/README.md#training-job-resource)
......@@ -171,7 +171,7 @@ In the future, a more general placement algorithm should be implemented, which m
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/local_architecture.png"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/local_architecture.png"/>
### Training Data
......
......@@ -8,11 +8,11 @@ Op graph to a multi-CPU Op graph, and run `ParallelDo` Op to run the graph.
## Transpiler
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/single-thread@3x.png" width="300">
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/single-thread@3x.png" width="300">
After converted:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/multi-threads@3x.png" width="1000">
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/multi-threads@3x.png" width="1000">
## Implement
......
......@@ -41,11 +41,11 @@ We will need these OPs: *Send*, *Recv*, *Enqueue*, *Dequeue*.
Below is an example of converting the user defined graph to the
subgraphs for the trainer and the parameter server:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/local-graph.png" width="300"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/local-graph.png" width="300"/>
After converting:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/dist-graph.png" width="700"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/dist-graph.png" width="700"/>
1. The parameter variable W and its optimizer program are placed on the parameter server.
1. Operators are added to the program.
......@@ -69,8 +69,7 @@ In Fluid, we introduce [SelectedRows](../selected_rows.md) to represent a list o
non-zero gradient data. So when we do parameter optimization both locally and remotely,
we only need to send those non-zero rows to the optimizer operators:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/sparse_update.png" width="700" />
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/sparse_update.png" width="700" />
### Benefits
- Model parallelism becomes easier to implement: it is an extension to
......
......@@ -5,7 +5,7 @@ This document describes the RNN (Recurrent Neural Network) operator and how it i
## RNN Algorithm Implementation
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/rnn.jpg"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/rnn.jpg"/>
</p>
The above diagram shows an RNN unrolled into a full network.
......
......@@ -66,7 +66,7 @@ As most C++ operators do, `batch_norm_op` is defined by inputs, outputs, attribu
The following graph showes the training computational process of `batch_norm_op`:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/batch_norm_op_kernel.png" width="800"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/batch_norm_op_kernel.png" width="800"/>
cudnn provides APIs to finish the whole series of computation, we can use them in our GPU kernel.
......@@ -124,7 +124,7 @@ for pass_id in range(PASS_NUM):
`is_infer` is an attribute. Once an operator is created, its attributes can not be changed. It suggests us that we shall maintain two `batch_norm_op` in the model, one's `is_infer` is `True`(we call it `infer_batch_norm_op`) and the other one's is `False`(we call it `train_batch_norm_op`). They share all parameters and variables, but be placed in two different branches. That is to say, if a network contains a `batch_norm_op`, it will fork into two branches, one go through `train_batch_norm_op` and the other one go through `infer_batch_norm_op`:
<div align=center>
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/batch_norm_fork.png" width="500"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/batch_norm_fork.png" width="500"/>
</div>
Just like what is shown in the above graph, the net forks before `batch_norm_op` and will never merge again. All the operators after `batch_norm_op` will duplicate.
......
......@@ -6,17 +6,17 @@ A central problem in machine learning is how to design an algorithm that will pe
### Parameter Norm Penalties
Most common regularization approaches in deep learning are based on limiting the capacity of the models by adding a parameter norm penalty to the objective function `J`. This is given as follows:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/loss_equation.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/loss_equation.png" align="center"/><br/>
The parameter `alpha` is a hyperparameter that weights the relative contribution of the norm penalty term, `omega`, relative to the standard objective function `J`.
The most commonly used norm penalties are the L2 norm penalty and the L1 norm penalty. These are given as follows:
##### L2 Regularization:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/l2_regularization.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/l2_regularization.png" align="center"/><br/>
##### L1 Regularization
<img src=".https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/l1_regularization.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/l1_regularization.png" align="center"/><br/>
A much more detailed mathematical background of regularization can be found [here](http://www.deeplearningbook.org/contents/regularization.html).
......@@ -40,11 +40,11 @@ The idea of building ops for regularization is in sync with the refactored Paddl
Below is an example of a really simple feed forward neural network.
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/feed_forward.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/feed_forward.png" align="center"/><br/>
The Python API will modify this computation graph to add regularization operators. The modified computation graph will look as follows:
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/feed_forward_regularized.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/feed_forward_regularized.png" align="center"/><br/>
   
### Python API implementation for Regularization
......
......@@ -116,7 +116,7 @@ The classical DS2 network contains 15 layers (from bottom to top):
- **One** CTC-loss layer
<div align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/ds2_network.png" width=350><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/ds2_network.png" width=350><br/>
Figure 1. Archetecture of Deep Speech 2 Network.
</div>
......@@ -208,7 +208,7 @@ TODO by Assignees
### Beam Search with CTC and LM
<div align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/beam_search.png" width=600><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/beam_search.png" width=600><br/>
Figure 2. Algorithm for CTC Beam Search Decoder.
</div>
......
......@@ -199,7 +199,7 @@ Packing the `selected_generation_scores` will get a `LoDTensor`, and each tail i
## LoD and shape changes during decoding
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/LOD-and-shape-changes-during-decoding.jpg"/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/LOD-and-shape-changes-during-decoding.jpg"/>
</p>
According to the image above, the only phase that changes the LoD is beam search.
......
......@@ -7,14 +7,14 @@ It applies several important concepts in machine learning system design, includi
In our GAN design, we wrap it as a user-friendly easily customized python API to design different models. We take the conditional DC-GAN (Unsupervised Representation Learning with Deep Convolutional Generative Adversarial Networks [https://arxiv.org/abs/1511.06434]) as an example due to its good performance on image generation.
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/test.dot.png" width = "35%" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/test.dot.png" width = "35%" align="center"/><br/>
Figure 1. The overall running logic of GAN. The black solid arrows indicate the forward pass; the green dashed arrows indicate the backward pass of generator training; the red dashed arrows indicate the backward pass of the discriminator training. The BP pass of the green (red) arrow should only update the parameters in the green (red) boxes. The diamonds indicate the data providers. d\_loss and g\_loss marked in red and green are the two targets we would like to run.
</p>
The operators, layers and functions required/optional to build a GAN demo is summarized in https://github.com/PaddlePaddle/Paddle/issues/4563.
<p align="center">
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/dcgan.png" width = "90%" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/dcgan.png" width = "90%" align="center"/><br/>
Figure 2. Photo borrowed from the original DC-GAN paper.
</p>
......
......@@ -37,7 +37,7 @@ PaddlePaddle每次发新的版本,遵循以下流程:
可以在此页面的"Artifacts"下拉框中找到生成的3个二进制文件,分别对应CAPI,`cp27m``cp27mu`的版本。然后按照上述的方法
使用`twine`工具上传即可。
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/ci_build_whl.png">
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/ci_build_whl.png">
* 注:CI环境使用 https://github.com/PaddlePaddle/buildtools 这里的DockerImage作为编译环境以支持更多的Linux
发型版,如果需要手动编译,也可以使用这些镜像。这些镜像也可以从 https://hub.docker.com/r/paddlepaddle/paddle_manylinux_devel/tags/ 下载得到。
......
......@@ -23,7 +23,7 @@ But how to record the time for the mixed C++ and CUDA program? There many C++ A
The overall flow is shown as the following figure.
<img src="https://github.com/PaddlePaddle/Paddle/tree/develop/doc/fluid/images/profiler.png" align="center"/><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/profiler.png" align="center"/><br/>
### Event
......
......@@ -17,6 +17,7 @@ limitations under the License. */
#include <deque>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
......@@ -96,6 +97,8 @@ class BlockDesc {
*/
void RemoveOp(size_t s, size_t e);
void RemoveVar(const std::string &name) { vars_.erase(name); }
std::vector<OpDesc *> AllOps() const;
size_t OpSize() const { return ops_.size(); }
......
......@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
return (lod_)[level].size() - 1;
}
// Split LoDTensor and copy to each place specified in places.
std::vector<LoDTensor> SplitLoDTensor(
const std::vector<platform::Place> places) const;
......
......@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
void ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data;
}
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
member_->local_scopes_[j]
->Var(it.first)
->GetMutable<LoDTensor>()
->ShareDataWith(lod_tensors[j]);
}
}
}
} // namespace framework
} // namespace paddle
......@@ -42,9 +42,13 @@ class ParallelExecutor {
bool allow_op_delay);
void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name = "fetched_var");
const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_;
void BCastParamsToGPUs(const ProgramDesc& startup_program) const;
......
......@@ -193,6 +193,7 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(send_vars_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_barrier_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor)
else()
set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op)
......
......@@ -186,7 +186,8 @@ void AsyncGRPCServer::WaitClientGet(int count) {
void AsyncGRPCServer::RunSyncUpdate() {
::grpc::ServerBuilder builder;
builder.AddListeningPort(address_, ::grpc::InsecureServerCredentials());
builder.AddListeningPort(address_, ::grpc::InsecureServerCredentials(),
&selected_port_);
builder.SetMaxSendMessageSize(std::numeric_limits<int>::max());
builder.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
builder.RegisterService(&service_);
......@@ -196,7 +197,8 @@ void AsyncGRPCServer::RunSyncUpdate() {
cq_prefetch_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
LOG(INFO) << "Server listening on " << address_ << std::endl;
LOG(INFO) << "Server listening on " << address_
<< " selected port: " << selected_port_;
std::function<void()> send_register =
std::bind(&AsyncGRPCServer::TryToRegisterNewSendOne, this);
......
......@@ -63,6 +63,8 @@ class AsyncGRPCServer final {
void SetExecutor(framework::Executor *executor) { executor_ = executor; }
int GetSelectedPort() { return selected_port_; }
const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); }
void Push(const std::string &msg_name) {
......@@ -111,6 +113,7 @@ class AsyncGRPCServer final {
int prefetch_blk_id_;
framework::ProgramDesc *program_;
framework::Executor *executor_;
int selected_port_;
};
}; // namespace detail
......
......@@ -27,8 +27,8 @@ template <typename T>
class MKLDNNMD {
public:
explicit MKLDNNMD(const T* in, const T* w, bool bias)
: in{paddle::framework::vectorize2int(in->dims())},
w{paddle::framework::vectorize2int(w->dims())} {
: in(paddle::framework::vectorize2int(in->dims())),
w(paddle::framework::vectorize2int(w->dims())) {
with_bias_ = bias;
}
......@@ -78,7 +78,7 @@ class MKLDNNMD {
class MKLDNNMemory {
public:
MKLDNNMemory(MKLDNNMD<Tensor>* t, const mkldnn::engine& e)
: md_{t}, engine_{e} {}
: md_(t), engine_(e) {}
virtual ~MKLDNNMemory() = default;
template <typename Output>
......
......@@ -12,20 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <stdint.h>
#include <ostream>
#include <thread>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/operators/listen_and_serv_op.h"
namespace paddle {
namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock";
void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
......@@ -66,143 +60,138 @@ static void ParallelExecuteBlocks(
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
}
class ListenAndServOp : public framework::OperatorBase {
public:
ListenAndServOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {
if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint");
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
server_thread_.reset(new std::thread(RunServer, rpc_service_));
}
}
ListenAndServOp::ListenAndServOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Stop() override {
rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
server_thread_->join();
int ListenAndServOp::GetSelectedPort() {
return rpc_service_->GetSelectedPort();
}
void ListenAndServOp::Stop() {
rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
server_thread_->join();
}
void ListenAndServOp::RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint");
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
}
void RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const override {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
// FIXME(Yancey1989): initialize rpc server with lazy mode.
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
auto ins = Inputs("X");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
size_t num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks");
framework::Executor executor(dev_place);
std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid)
block_list.push_back(blkid);
auto prepared = executor.Prepare(*program, block_list);
prepared.insert(
prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
// TODO(qiao) set proper fields for table lookup and update
rpc_service_->SetExecutor(&executor);
rpc_service_->SetPrefetchBlkdId(0);
rpc_service_->SetProgram(program);
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
// Record received sparse variables, so that
// we could reset those after execute optimize program
std::vector<framework::Variable *> sparse_vars;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::ReceivedMessage v = rpc_service_->Get();
auto recv_var_name = v.first;
if (recv_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
} else if (recv_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
VLOG(3) << "received grad: " << recv_var_name;
recv_var_cnt++;
auto var = v.second->GetVar();
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << recv_var_name;
PADDLE_THROW("Can not find server side var");
}
if (var->IsType<framework::SelectedRows>()) {
sparse_vars.push_back(var);
}
}
}
if (exit_flag) {
rpc_service_->SetCond(1);
rpc_service_->ShutDown();
auto ins = Inputs("X");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
size_t num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks");
framework::Executor executor(dev_place);
std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
block_list.push_back(blkid);
}
auto prepared = executor.Prepare(*program, block_list);
// Insert placeholder for block0 which holds current op itself.
prepared.insert(prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
// TODO(qiao) set proper fields for table lookup and update
rpc_service_->SetExecutor(&executor);
rpc_service_->SetPrefetchBlkdId(0);
rpc_service_->SetProgram(program);
// start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_));
// FIXME(typhoonzero): do we need to wait until the server port is ready?
sleep(5);
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
// Record received sparse variables, so that
// we could reset those after execute optimize program
std::vector<framework::Variable *> sparse_vars;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::ReceivedMessage v = rpc_service_->Get();
auto recv_var_name = v.first;
if (recv_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
}
// NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads
// and this will still work.
// The optimize blocks which have the same parent ID would run parallel
// TODO(Yancey1989): need to use ParallelExecutor for future
int32_t last_parent_blkid = program->Block(1).Parent();
std::vector<size_t> parallel_blkids;
parallel_blkids.push_back(1);
double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (program->Block(blkid).Parent() != last_parent_blkid) {
for (size_t idx : parallel_blkids) VLOG(3) << idx;
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent();
} else if (recv_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
VLOG(3) << "received grad: " << recv_var_name;
recv_var_cnt++;
auto var = v.second->GetVar();
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << recv_var_name;
PADDLE_THROW("Can not find server side var");
}
if (var->IsType<framework::SelectedRows>()) {
sparse_vars.push_back(var);
}
parallel_blkids.push_back(blkid);
}
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
VLOG(3) << "run all blocks spent " << detail::GetTimestamp() - ts
<< "(ms)";
// Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next
// mini-batch.
// TODO(Yancey1989): move the reset action into an operator, we couldn't
// have any hide logic in the operator.
for (auto &var : sparse_vars) {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
}
}
if (exit_flag) {
rpc_service_->SetCond(1);
// NOTE: does not consider barrier request retry in here, we may use
// global barrier id to resolve this.
rpc_service_->WaitClientGet(fan_in);
sparse_vars.clear();
} // while(true)
}
rpc_service_->ShutDown();
break;
}
protected:
std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
std::shared_ptr<std::thread> server_thread_;
};
// NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads
// and this will still work.
// The optimize blocks which have the same parent ID would run parallel
// TODO(Yancey1989): need to use ParallelExecutor for future
int32_t last_parent_blkid = program->Block(1).Parent();
std::vector<size_t> parallel_blkids;
parallel_blkids.push_back(1);
double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent();
}
parallel_blkids.push_back(blkid);
}
ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next
// mini-batch.
// TODO(Yancey1989): move the reset action into an operator, we couldn't
// have any hide logic in the operator.
for (auto &var : sparse_vars) {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
}
rpc_service_->SetCond(1);
// FIXME(typhoonzero): use another condition to sync wait clients get.
rpc_service_->WaitClientGet(fan_in);
sparse_vars.clear();
} // while(true)
}
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
public:
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <stdint.h>
#include <ostream>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
namespace paddle {
namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock";
void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service);
class ListenAndServOp : public framework::OperatorBase {
public:
ListenAndServOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs);
int GetSelectedPort();
void Stop() override;
void RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const override;
protected:
mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
mutable std::shared_ptr<std::thread> server_thread_;
};
} // namespace operators
} // namespace paddle
......@@ -73,7 +73,7 @@ class PriorBoxOp : public framework::OperatorWithKernel {
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(
framework::ToDataType(ctx.Input<framework::Tensor>("Input")->type()),
platform::CPUPlace());
ctx.device_context());
}
};
......@@ -171,6 +171,5 @@ namespace ops = paddle::operators;
REGISTER_OPERATOR(prior_box, ops::PriorBoxOp, ops::PriorBoxOpMaker,
paddle::framework::EmptyGradOpMaker);
REGISTER_OP_CPU_KERNEL(
prior_box, ops::PriorBoxOpKernel<paddle::platform::CPUPlace, float>,
ops::PriorBoxOpKernel<paddle::platform::CPUPlace, double>);
REGISTER_OP_CPU_KERNEL(prior_box, ops::PriorBoxOpKernel<float>,
ops::PriorBoxOpKernel<double>);
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/prior_box_op.h"
namespace paddle {
namespace operators {
template <typename T>
__device__ inline T clip(T in) {
return min(max(in, 0.), 1.);
}
template <typename T>
__global__ void GenPriorBox(T* out, const T* aspect_ratios, const int height,
const int width, const int im_height,
const int im_width, const int as_num,
const T offset, const T step_width,
const T step_height, const T* min_sizes,
const T* max_sizes, const int min_num,
bool is_clip) {
int num_priors = max_sizes ? as_num * min_num + min_num : as_num * min_num;
int box_num = height * width * num_priors;
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < box_num;
i += blockDim.x * gridDim.x) {
int h = i / (num_priors * width);
int w = (i / num_priors) % width;
int p = i % num_priors;
int m = max_sizes ? p / (as_num + 1) : p / as_num;
T cx = (w + offset) * step_width;
T cy = (h + offset) * step_height;
T bw, bh;
T min_size = min_sizes[m];
if (max_sizes) {
int s = p % (as_num + 1);
if (s < as_num) {
T ar = aspect_ratios[s];
bw = min_size * sqrt(ar) / 2.;
bh = min_size / sqrt(ar) / 2.;
} else {
T max_size = max_sizes[m];
bw = sqrt(min_size * max_size) / 2.;
bh = bw;
}
} else {
int s = p % as_num;
T ar = aspect_ratios[s];
bw = min_size * sqrt(ar) / 2.;
bh = min_size / sqrt(ar) / 2.;
}
T xmin = (cx - bw) / im_width;
T ymin = (cy - bh) / im_height;
T xmax = (cx + bw) / im_width;
T ymax = (cy + bh) / im_height;
out[i * 4] = is_clip ? clip<T>(xmin) : xmin;
out[i * 4 + 1] = is_clip ? clip<T>(ymin) : ymin;
out[i * 4 + 2] = is_clip ? clip<T>(xmax) : xmax;
out[i * 4 + 3] = is_clip ? clip<T>(ymax) : ymax;
}
}
template <typename T>
__global__ void SetVariance(T* out, const T* var, const int vnum,
const int num) {
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < num;
i += blockDim.x * gridDim.x) {
out[i] = var[i % vnum];
}
}
template <typename T>
class PriorBoxOpCUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto* input = ctx.Input<paddle::framework::Tensor>("Input");
auto* image = ctx.Input<paddle::framework::Tensor>("Image");
auto* boxes = ctx.Output<paddle::framework::Tensor>("Boxes");
auto* vars = ctx.Output<paddle::framework::Tensor>("Variances");
auto min_sizes = ctx.Attr<std::vector<float>>("min_sizes");
auto max_sizes = ctx.Attr<std::vector<float>>("max_sizes");
auto input_aspect_ratio = ctx.Attr<std::vector<float>>("aspect_ratios");
auto variances = ctx.Attr<std::vector<float>>("variances");
auto flip = ctx.Attr<bool>("flip");
auto clip = ctx.Attr<bool>("clip");
std::vector<float> aspect_ratios;
ExpandAspectRatios(input_aspect_ratio, flip, aspect_ratios);
T step_w = static_cast<T>(ctx.Attr<float>("step_w"));
T step_h = static_cast<T>(ctx.Attr<float>("step_h"));
T offset = static_cast<T>(ctx.Attr<float>("offset"));
auto im_width = image->dims()[3];
auto im_height = image->dims()[2];
auto width = input->dims()[3];
auto height = input->dims()[2];
T step_width, step_height;
if (step_w == 0 || step_h == 0) {
step_width = static_cast<T>(im_width) / width;
step_height = static_cast<T>(im_height) / height;
} else {
step_width = step_w;
step_height = step_h;
}
int num_priors = aspect_ratios.size() * min_sizes.size();
if (max_sizes.size() > 0) {
num_priors += max_sizes.size();
}
int min_num = static_cast<int>(min_sizes.size());
int box_num = width * height * num_priors;
int block = 512;
int grid = (box_num + block - 1) / block;
auto stream =
ctx.template device_context<platform::CUDADeviceContext>().stream();
boxes->mutable_data<T>(ctx.GetPlace());
vars->mutable_data<T>(ctx.GetPlace());
framework::Tensor r;
framework::TensorFromVector(aspect_ratios, ctx.device_context(), &r);
framework::Tensor min;
framework::TensorFromVector(min_sizes, ctx.device_context(), &min);
T* max_data = nullptr;
framework::Tensor max;
if (max_sizes.size() > 0) {
framework::TensorFromVector(max_sizes, ctx.device_context(), &max);
max_data = max.data<T>();
}
GenPriorBox<T><<<grid, block, 0, stream>>>(
boxes->data<T>(), r.data<T>(), height, width, im_height, im_width,
aspect_ratios.size(), offset, step_width, step_height, min.data<T>(),
max_data, min_num, clip);
framework::Tensor v;
framework::TensorFromVector(variances, ctx.device_context(), &v);
grid = (box_num * 4 + block - 1) / block;
SetVariance<T><<<grid, block, 0, stream>>>(vars->data<T>(), v.data<T>(),
variances.size(), box_num * 4);
}
}; // namespace operators
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(prior_box, ops::PriorBoxOpCUDAKernel<float>,
ops::PriorBoxOpCUDAKernel<double>);
......@@ -51,7 +51,7 @@ struct ClipFunctor {
}
};
template <typename Place, typename T>
template <typename T>
class PriorBoxOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
......@@ -106,49 +106,24 @@ class PriorBoxOpKernel : public framework::OpKernel<T> {
int idx = 0;
for (size_t s = 0; s < min_sizes.size(); ++s) {
auto min_size = min_sizes[s];
// first prior: aspect_ratio = 1, size = min_size
box_width = box_height = min_size / 2.;
// xmin
e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
// ymin
e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
// xmax
e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
// ymax
e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
idx++;
if (max_sizes.size() > 0) {
auto max_size = max_sizes[s];
// second prior: aspect_ratio = 1,
// size = sqrt(min_size * max_size)
box_width = box_height = sqrt(min_size * max_size) / 2.;
// xmin
// priors with different aspect ratios
for (size_t r = 0; r < aspect_ratios.size(); ++r) {
float ar = aspect_ratios[r];
box_width = min_size * sqrt(ar) / 2.;
box_height = min_size / sqrt(ar) / 2.;
e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
// ymin
e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
// xmax
e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
// ymax
e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
idx++;
}
// rest of priors
for (size_t r = 0; r < aspect_ratios.size(); ++r) {
float ar = aspect_ratios[r];
if (fabs(ar - 1.) < 1e-6) {
continue;
}
box_width = min_size * sqrt(ar) / 2.;
box_height = min_size / sqrt(ar) / 2.;
// xmin
if (max_sizes.size() > 0) {
auto max_size = max_sizes[s];
// square prior with size sqrt(minSize * maxSize)
box_width = box_height = sqrt(min_size * max_size) / 2.;
e_boxes(h, w, idx, 0) = (center_x - box_width) / img_width;
// ymin
e_boxes(h, w, idx, 1) = (center_y - box_height) / img_height;
// xmax
e_boxes(h, w, idx, 2) = (center_x + box_width) / img_width;
// ymax
e_boxes(h, w, idx, 3) = (center_y + box_height) / img_height;
idx++;
}
......
......@@ -20,6 +20,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/string/printf.h"
......@@ -34,6 +35,7 @@ namespace m = paddle::operators::math;
// global for simplicity.
std::unique_ptr<f::OperatorBase> listen_and_serv_op;
int selected_port;
void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
p::CPUDeviceContext ctx(place);
......@@ -128,14 +130,16 @@ void StartServerNet(bool is_sparse) {
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
f::AttributeMap attrs;
attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
attrs.insert({"endpoint", std::string("127.0.0.1:0")});
attrs.insert({"Fanin", 1});
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
attrs.insert({"OptimizeBlock", optimize_block});
listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
LOG(INFO) << "selected port before run " << selected_port;
listen_and_serv_op->Run(scope, place);
LOG(INFO) << "server exit";
}
TEST(SendRecvOp, CPUDense) {
......@@ -149,12 +153,19 @@ TEST(SendRecvOp, CPUDense) {
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get())
->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})});
auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
LOG(INFO) << "before run " << endpoint;
send_op->Run(scope, place);
LOG(INFO) << "end run";
auto in_var = scope.Var("x1");
auto tensor = in_var->GetMutable<f::LoDTensor>();
......@@ -167,6 +178,7 @@ TEST(SendRecvOp, CPUDense) {
for (int64_t i = 0; i < target->numel(); ++i) {
EXPECT_EQ(expected[i] * 2, actual[i]);
}
LOG(INFO) << "before stop";
listen_and_serv_op->Stop();
server_thread.join();
listen_and_serv_op.reset(nullptr);
......@@ -182,8 +194,13 @@ TEST(SendRecvOp, CPUSparse) {
InitSelectedRowsInScope(scope, place);
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get())
->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})});
auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
......
......@@ -15,6 +15,8 @@ limitations under the License. */
#include "paddle/fluid/pybind/protobuf.h"
#include <deque>
#include <iostream>
#include <string>
#include <tuple>
#include "paddle/fluid/framework/backward.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_desc.h"
......@@ -98,7 +100,7 @@ namespace pybind {
using namespace paddle::framework; // NOLINT
template <typename T>
static py::bytes SerializeMessage(T &self) {
static py::bytes SerializeMessage(T &self) { // NOLINT
// Check IsInitialized in Python
std::string retv;
PADDLE_ENFORCE(self.Proto()->SerializePartialToString(&retv),
......@@ -107,7 +109,7 @@ static py::bytes SerializeMessage(T &self) {
}
// Bind Methods
void BindProgramDesc(py::module &m) {
void BindProgramDesc(py::module &m) { // NOLINT
py::class_<ProgramDesc>(m, "ProgramDesc", "")
.def(py::init<>())
.def("__init__",
......@@ -151,7 +153,7 @@ void BindProgramDesc(py::module &m) {
});
}
void BindBlockDesc(py::module &m) {
void BindBlockDesc(py::module &m) { // NOLINT
py::class_<BlockDesc>(m, "BlockDesc", "")
.def_property_readonly("id", &BlockDesc::ID)
.def_property_readonly("parent", &BlockDesc::Parent)
......@@ -200,13 +202,19 @@ void BindBlockDesc(py::module &m) {
return self.FindVarRecursive(name);
},
py::return_value_policy::reference)
.def("remove_var",
[](BlockDesc &self, py::bytes byte_name) {
std::string name = byte_name;
return self.RemoveVar(name);
},
py::return_value_policy::reference)
.def("all_vars", &BlockDesc::AllVars, py::return_value_policy::reference)
.def("op_size", &BlockDesc::OpSize)
.def("op", &BlockDesc::Op, py::return_value_policy::reference)
.def("serialize_to_string", SerializeMessage<BlockDesc>);
}
void BindVarDsec(py::module &m) {
void BindVarDsec(py::module &m) { // NOLINT
py::class_<VarDesc> var_desc(m, "VarDesc", "");
var_desc
.def("name",
......@@ -257,7 +265,7 @@ void BindVarDsec(py::module &m) {
.value("RAW", proto::VarType::RAW);
}
void BindOpDesc(py::module &m) {
void BindOpDesc(py::module &m) { // NOLINT
py::enum_<proto::AttrType>(m, "AttrType", "")
.value("INT", proto::AttrType::INT)
.value("INTS", proto::AttrType::INTS)
......
......@@ -408,11 +408,7 @@ class DistributeTranspiler:
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
tmpvar = s_prog.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
tmpvar = s_prog.global_block().clone_variable(var)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
......@@ -708,11 +704,7 @@ class DistributeTranspiler:
varlist = [varlist]
for var in varlist:
program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
program.global_block().clone_variable(var)
optimize_block.append_op(
type=opt_op.type,
......
......@@ -946,13 +946,20 @@ class Block(object):
The new variable cloned from 'var' in current block.
"""
assert isinstance(var, Variable)
return self.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=True)
ret_var = None
# make STEP_SCOPES var can be safely cloned.
if var.type == core.VarDesc.VarType.STEP_SCOPES:
ret_var = self.create_var(
name=var.name, persistable=var.persistable, type=var.type)
else:
ret_var = self.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=True)
return ret_var
class Program(object):
......
......@@ -26,25 +26,29 @@ class ParallelExecutor(object):
use_cuda,
num_threads=None,
allow_op_delay=False):
places = []
self._places = []
self._act_places = []
if use_cuda:
for i in xrange(core.get_cuda_device_count()):
p = core.Place()
p.set_place(core.CUDAPlace(i))
places.append(p)
self._act_places.append(core.CUDAPlace(i))
p.set_place(self._act_places[-1])
self._places.append(p)
else:
for i in xrange(multiprocessing.cpu_count()):
p = core.Place()
p.set_place(core.CPUPlace())
places.append(p)
self._act_places.append(core.CPUPlace(i))
p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution"
if num_threads is None:
if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
num_threads = len(places)
num_threads = len(self._places)
else:
min(len(places) * 2, multiprocessing.cpu_count())
min(len(self._places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program()
main = framework.default_main_program()
......@@ -53,7 +57,7 @@ class ParallelExecutor(object):
self.executor = core.ParallelExecutor(
num_threads,
True if use_cuda else False, # use_event
places,
self._places,
set([
p.name for p in main.global_block().iter_parameters()
if not p.stop_gradient
......@@ -65,8 +69,25 @@ class ParallelExecutor(object):
allow_op_delay)
self.scope = scope
def run(self, fetch_list):
def run(self, fetch_list, feed_dict={}):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
if not isinstance(feed_dict, dict):
raise TypeError("feed_dict should be a dict")
feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
feed_tensor_dict[feed_name] = feed_tensor
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name)
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))]
......@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist
import paddle.dataset.wmt16 as wmt16
def simple_fc_net():
reader = fluid.layers.open_recordio_file(
filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
def simple_fc_net(use_feed):
if use_feed:
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else:
reader = fluid.layers.open_recordio_file(
filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img
for _ in xrange(4):
hidden = fluid.layers.fc(
......@@ -42,13 +46,18 @@ def simple_fc_net():
return loss
def fc_with_batchnorm():
reader = fluid.layers.open_recordio_file(
filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
def fc_with_batchnorm(use_feed):
if use_feed:
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else:
reader = fluid.layers.open_recordio_file(
filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img
for _ in xrange(1):
hidden = fluid.layers.fc(
......@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
def SE_ResNeXt152Small(batch_size=2):
def SE_ResNeXt152Small(batch_size=2, use_feed=False):
assert not use_feed, "SE_ResNeXt doesn't support feed yet"
img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant(
......@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase):
memory_opt=True,
iter=10,
batch_size=None,
allow_op_delay=False):
allow_op_delay=False,
feed_dict={}):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = method()
loss = method(use_feed=len(feed_dict) > 0)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
exe = fluid.ParallelExecutor(
loss_name=loss.name,
use_cuda=True,
allow_op_delay=allow_op_delay)
exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
first_loss, = exe.run([loss.name])
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss = numpy.array(first_loss)
for i in xrange(iter):
exe.run([])
exe.run([], feed_dict=feed_dict)
last_loss, = exe.run([loss.name])
last_loss, = exe.run([loss.name], feed_dict=feed_dict)
end = time.time()
if batch_size is not None:
......@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase):
self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
simple_fc_net, feed_dict={"image": img,
"label": label})
def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm)
self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm, feed_dict={"image": img,
"label": label})
class TestResnet(TestParallelExecutorBase):
......@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
import transformer_model
def transformer():
def transformer(use_feed):
assert not use_feed, "transfomer doesn't support feed yet"
return transformer_model.transformer(
ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
......
......@@ -28,7 +28,6 @@ class TestPriorBoxOp(OpTest):
self.attrs = {
'min_sizes': self.min_sizes,
'max_sizes': self.max_sizes,
'aspect_ratios': self.aspect_ratios,
'variances': self.variances,
'flip': self.flip,
......@@ -37,25 +36,28 @@ class TestPriorBoxOp(OpTest):
'step_h': self.step_h,
'offset': self.offset
}
if len(self.max_sizes) > 0:
self.attrs['max_sizes'] = self.max_sizes
self.outputs = {'Boxes': self.out_boxes, 'Variances': self.out_var}
def test_check_output(self):
self.check_output()
def test_check_grad(self):
return
def setUp(self):
self.op_type = "prior_box"
self.set_data()
def set_max_sizes(self):
max_sizes = [5, 10]
self.max_sizes = np.array(max_sizes).astype('float32').tolist()
def init_test_params(self):
self.layer_w = 4
self.layer_h = 4
self.layer_w = 32
self.layer_h = 32
self.image_w = 20
self.image_h = 20
self.image_w = 40
self.image_h = 40
self.step_w = float(self.image_w) / float(self.layer_w)
self.step_h = float(self.image_h) / float(self.layer_h)
......@@ -66,8 +68,7 @@ class TestPriorBoxOp(OpTest):
self.min_sizes = [2, 4]
self.min_sizes = np.array(self.min_sizes).astype('float32').tolist()
self.max_sizes = [5, 10]
self.max_sizes = np.array(self.max_sizes).astype('float32').tolist()
self.set_max_sizes()
self.aspect_ratios = [2.0, 3.0]
self.flip = True
self.real_aspect_ratios = [1, 2.0, 1.0 / 2.0, 3.0, 1.0 / 3.0]
......@@ -79,7 +80,7 @@ class TestPriorBoxOp(OpTest):
self.clip = True
self.num_priors = len(self.real_aspect_ratios) * len(self.min_sizes)
if len(self.max_sizes) > 1:
if len(self.max_sizes) > 0:
self.num_priors += len(self.max_sizes)
self.offset = 0.5
......@@ -105,35 +106,27 @@ class TestPriorBoxOp(OpTest):
idx = 0
for s in range(len(self.min_sizes)):
min_size = self.min_sizes[s]
c_w = c_h = min_size / 2.
out_boxes[h, w, idx, :] = [
(c_x - c_w) / self.image_w, (c_y - c_h) / self.image_h,
(c_x + c_w) / self.image_w, (c_y + c_h) / self.image_h
]
idx += 1
if len(self.max_sizes) > 0:
max_size = self.max_sizes[s]
# second prior: aspect_ratio = 1,
c_w = c_h = math.sqrt(min_size * max_size) / 2
# rest of priors
for r in range(len(self.real_aspect_ratios)):
ar = self.real_aspect_ratios[r]
c_w = min_size * math.sqrt(ar) / 2
c_h = (min_size / math.sqrt(ar)) / 2
out_boxes[h, w, idx, :] = [(c_x - c_w) / self.image_w,
(c_y - c_h) / self.image_h,
(c_x + c_w) / self.image_w,
(c_y + c_h) / self.image_h]
idx += 1
# rest of priors
for r in range(len(self.real_aspect_ratios)):
ar = self.real_aspect_ratios[r]
if math.fabs(ar - 1.) < 1e-6:
continue
c_w = min_size * math.sqrt(ar) / 2
c_h = (min_size / math.sqrt(ar)) / 2
if len(self.max_sizes) > 0:
max_size = self.max_sizes[s]
# second prior: aspect_ratio = 1,
c_w = c_h = math.sqrt(min_size * max_size) / 2
out_boxes[h, w, idx, :] = [(c_x - c_w) / self.image_w,
(c_y - c_h) / self.image_h,
(c_x + c_w) / self.image_w,
(c_y + c_h) / self.image_h]
idx += 1
# clip the prior's coordidate such that it is within[0, 1]
if self.clip:
out_boxes = np.clip(out_boxes, 0.0, 1.0)
......@@ -144,5 +137,10 @@ class TestPriorBoxOp(OpTest):
self.out_var = out_var.astype('float32')
class TestPriorBoxOpWithMaxSize(TestPriorBoxOp):
def set_max_sizes(self):
self.max_sizes = []
if __name__ == '__main__':
unittest.main()
......@@ -19,9 +19,9 @@ from paddle.fluid.framework import Program
class TestOpDesc(unittest.TestCase):
def test_op_desc(self):
prog = core.ProgramDesc()
self.assertIsNotNone(prog)
block = prog.block(0)
program_desc = core.ProgramDesc()
self.assertIsNotNone(program_desc)
block = program_desc.block(0)
self.assertIsNotNone(block)
op = block.append_op()
self.assertIsNotNone(op)
......@@ -67,7 +67,7 @@ class TestOpDesc(unittest.TestCase):
self.assertEqual(8, len(op.attr_names()))
op.set_block_attr("block_attr", prog.block(0))
op.set_block_attr("block_attr", program_desc.block(0))
self.assertEqual(0, op.block_attr("block_attr"))
mul_op = block.append_op()
......@@ -88,20 +88,20 @@ class TestProgramDesc(unittest.TestCase):
del program_desc
def test_append_block(self):
prog_desc = core.ProgramDesc()
self.assertIsNotNone(prog_desc)
block_root = prog_desc.block(0)
program_desc = core.ProgramDesc()
self.assertIsNotNone(program_desc)
block_root = program_desc.block(0)
self.assertIsNotNone(block_root)
self.assertEqual(block_root.id, 0)
block1 = prog_desc.append_block(block_root)
block2 = prog_desc.append_block(block1)
block1 = program_desc.append_block(block_root)
block2 = program_desc.append_block(block1)
self.assertIsNotNone(block1)
self.assertEqual(block1.id, block2.parent)
self.assertEqual(block_root.id, block1.parent)
block3 = prog_desc.append_block(block_root)
block3 = program_desc.append_block(block_root)
self.assertEqual(block3.parent, block_root.id)
self.assertEqual(prog_desc.block(1).id, 1)
self.assertEqual(4, prog_desc.num_blocks())
self.assertEqual(program_desc.block(1).id, 1)
self.assertEqual(4, program_desc.num_blocks())
class TestVarDesc(unittest.TestCase):
......@@ -162,9 +162,9 @@ class TestVarDesc(unittest.TestCase):
class TestBlockDesc(unittest.TestCase):
def test_add_var(self):
prog = core.ProgramDesc()
self.assertIsNotNone(prog)
block = prog.block(0)
program_desc = core.ProgramDesc()
self.assertIsNotNone(program_desc)
block = program_desc.block(0)
self.assertIsNotNone(block)
var1 = block.var("var1")
var2 = block.var("var2")
......@@ -175,9 +175,9 @@ class TestBlockDesc(unittest.TestCase):
self.assertEqual(var2_re, var2)
def test_add_op(self):
prog = core.ProgramDesc()
self.assertIsNotNone(prog)
block = prog.block(0)
program_desc = core.ProgramDesc()
self.assertIsNotNone(program_desc)
block = program_desc.block(0)
self.assertIsNotNone(block)
op1 = block.append_op()
op2 = block.append_op()
......@@ -189,9 +189,9 @@ class TestBlockDesc(unittest.TestCase):
def test_remove_op(self):
program = Program()
prog = program.desc
self.assertIsNotNone(prog)
block = prog.block(0)
program_desc = program.desc
self.assertIsNotNone(program_desc)
block = program_desc.block(0)
self.assertIsNotNone(block)
op0 = block.append_op()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册