提交 ad2e4206 编写于 作者: Y yuyang18

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into...

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into feature/matmul_support_float16_double
...@@ -9,7 +9,7 @@ import subprocess ...@@ -9,7 +9,7 @@ import subprocess
import platform import platform
COPYRIGHT = ''' COPYRIGHT = '''
Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
# Embed Paddle Inference in Your Application
Paddle inference offers the APIs in `C` and `C++` languages.
One can easily deploy a model trained by Paddle following the steps as below:
1. Optimize the native model;
2. Write some codes for deployment.
Let's explain the steps in detail.
## Optimize the native Fluid Model
The native model that get from the training phase needs to be optimized for that.
- Clean the noise such as the cost operators that do not need inference;
- Prune unnecessary computation fork that has nothing to do with the output;
- Remove extraneous variables;
- Memory reuse for native Fluid executor;
- Translate the model storage format to some third-party engine's, so that the inference API can utilize the engine for acceleration;
We have an official tool to do the optimization, call `paddle_inference_optimize --help` for more information.
## Write some codes
Read `paddle_inference_api.h` for more information.
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <string>
#include <vector>
namespace paddle {
class Predictor {
public:
struct Attr;
Predictor() = default;
// Build the network before inference.
bool Init(const Attr& attr);
// Predict an record.
// Arguments:
// inputs: the name of the input variables.
// outputs: the name of the output varaibles.
// input_shapes: the shape of the input variables.
// output_shapes: the shape of the output variables.
// input_data: the data of the input variables.
// output_data: the data of the output variables.
bool Run(const std::vector<std::string>& inputs,
const std::vector<std::string>& outputs,
const std::vector<std::vector<int>>& input_shapes,
const std::vector<std::vector<int>>& output_shapes,
const std::vector<std::vector<float>>& input_data,
std::vector<std::vector<float>>* output_data);
// Clone a predictor that share the model weights.
Predictor* Clone();
// Destroy the Predictor.
~Predictor();
struct Attr {
enum class EngineKind;
std::string model_dir; // path to the model directory.
bool enable_engine{false}; // Enable to execute (part of) the model on
// third-party engines.
EngineKind engine_kind{Attr::EngineKind::kNone};
enum class EngineKind {
kNone = -1, // Use the native Fluid facility.
kAnakin, // Use Anakin for inference.
kTensorRT, // Use TensorRT for inference.
kAutoMixedAnakin, // Automatically mix Fluid with Anakin.
kAutoMixedTensorRT, // Automatically mix Fluid with TensorRT.
};
};
};
} // namespace paddle
...@@ -77,8 +77,7 @@ print "The sematic-vector of testA: ", paddle.infer(fA, parameters, testA) ...@@ -77,8 +77,7 @@ print "The sematic-vector of testA: ", paddle.infer(fA, parameters, testA)
### Example 2. Sharing Parameters between "Models" ### Example 2. Sharing Parameters between "Models"
We use [GAN](https://github.com/PaddlePaddle/book/tree/develop/gan) in We use GAN in this example. In the following example program, `d0` and `d1`
this example. In the following example program, `d0` and `d1`
correspond to the two networks in the following figure: correspond to the two networks in the following figure:
<img src="https://github.com/wangyang59/book/raw/00036f4b0da5225041a6824587c1a01cf20159b1/gan/image/gan_ig.png" width=400 /> <img src="https://github.com/wangyang59/book/raw/00036f4b0da5225041a6824587c1a01cf20159b1/gan/image/gan_ig.png" width=400 />
......
...@@ -75,7 +75,7 @@ Different layout leads to different implementation of the operator kernel. There ...@@ -75,7 +75,7 @@ Different layout leads to different implementation of the operator kernel. There
- The inference of Layout is at run-time, not at compile-time. - The inference of Layout is at run-time, not at compile-time.
- Every operator has to implement different kernels for different layouts. Let's take MKLDNN as an example. If we want to implement an MKLDNN convolution operator, we have to implement all the kernels for different layouts, which are listed [here](http://01org.github.io/mkl-dnn/structmkldnn_1_1memory.html). And we will have a special macro to register kernels for MKLDNN operators. - Every operator has to implement different kernels for different layouts. Let's take MKLDNN as an example. If we want to implement an MKLDNN convolution operator, we have to implement all the kernels for different layouts, which are listed [here](http://intel.github.io/mkl-dnn/structmkldnn_1_1memory.html). And we will have a special macro to register kernels for MKLDNN operators.
`Layout` is also defined as a enum variable: `Layout` is also defined as a enum variable:
......
# Distributed Training with NCCL2 and RDMA
When doing distributed multi-GPU training, network bandwith often becomes the
bottle neck. We introduce a way to use NCCL2 to do such training job to
achieve best performace.
## Prepare Hardwares with RDMA and Multiple GPUs
I'm using two Linux servers each of them is installed with 8 GPUs and
one 100Gb RDMA card.
Base environment is:
* OS: CentOS 7.4
* RDMA device: "Mellanox Technologies MT27700 Family [ConnectX-4]"
* Kernel version: `4.4.88-1.el7.elrepo.x86_64`
* Docker version: `1.12.6`
* Docker storage driver: `overlay2`
* IP addresses: 192.168.16.30,192.168.16.34
In general, the steps including:
1. Install GPU drivers
1. Install RDMA drivers
1. Install "InfiniBand Support"
1. Use docker to run tests and make sure GPUs and RDMA can work inside
the container.
I'll ommit section "Install GPU drivers" because we can find it easily
somewhere else.
### Install RDMA drivers
For my case, I've got two machines with device
"Mellanox Technologies MT27700 Family [ConnectX-4]" installed. The OS was
"CentOS 7.4" and I updated the kernel to version 4.4 so that docker can
work with latest overlay2 filesystem.
***NOTE: before you start, make sure you have a way to get a console
of the server other than ssh because we may need to re-configure the
network device.***
1. Go to http://www.mellanox.com/page/products_dyn?product_family=26,
download `MLNX_OFED` software in the bottom of the page, and upload it
onto the server.
1. Run `./mlnxofedinstall --add-kernel-support` in the software package.
1. Run `/etc/init.d/openibd restart` to make everything work, note that
this operation may cause the network goes down if you are using this
RDMA device as default network device and use ssh to login the server.
1. Re-configure the network interface, for example:
`ifconfig eth2 192.168.16.30/20 up`, then add routes if needed:
`ip route add default via 192.168.16.1 dev eth2`.
1. Do the same thing on the other node.
1. Use `ping` to test if the two nodes have typical ICMP connection.
1. Use either `udaddy` or `ib_write_bw` to test the network connection is
ready and have the desired bandwith.
### Prepare Docker Image to Run RDMA Programs
1. Build a docker image using cuda base image like: `nvidia/cuda:8.0-cudnn5-devel-ubuntu16.04` and install paddlepaddle whl
package in it.
1. Start a docker container and mount GPU driver libs into it (you can
skip this step if you are using nvidia-docker).
1. Mount RDMA dirvers and libs into the docker image (see below section),
also `udaddy` and `ib_write_bw` if needed.
1. Mount GPU devices and RDMA devices into the container using `--device`
or just use privileged mode `--privileged`.
1. Start the container using host network mode: `--net=host`
### RDMA Library Files Needed
Usually, `MLNX_OFED` install latest supported libs under
`/usr/lib64/mlnx_ofed/valgrind`. Other libs also needed to run RDMA programs
is listed below. These libs must be mounted into the docker container.
* Libs under `/usr/lib64/mlnx_ofed/valgrind`
* libibcm.so
* libibverbs.so
* libmlx4.so
* libmlx5.so
* libmlx5-rdmav2.so
* librdmacm.so
* Other libs:
* libnl-3.so.200
* libnl-route-3.so.200
* libnuma.so.1
## Start to Run the Training Job
Setting NCCL environment variables to turn NCCL switches on and off:
| Env Name | Description |
| --- | --- |
| NCCL_SOCKET_IFNAME | The RDMA device, e.g. eth2 |
| NCCL_P2P_DISABLE | Set to 1 to disable P2P transfer between GPUs |
| NCCL_IB_DISABLE | Set to 1 to disable using RDMA |
| NCCL_IB_CUDA_SUPPORT | Set to 1 to enable GPU Direct if supported |
| NCCL_DEBUG | Set debug level: VERSION, WARN, INFO |
My two servers are: `192.168.16.30,192.168.16.34`, On node 1, Run :
```bash
PADDLE_TRAINER_ID=0 PADDLE_PORT=48372 PADDLE_WORKERS=192.168.16.30,192.168.16.34 POD_IP=192.168.16.30 stdbuf -oL python vgg16.py
```
On node 2, Run:
```bash
PADDLE_TRAINER_ID=1 PADDLE_PORT=48372 PADDLE_WORKERS=192.168.16.30,192.168.16.34 POD_IP=192.168.16.34 stdbuf -oL python vgg16.py
```
...@@ -38,9 +38,7 @@ void BroadcastOpHandle::RunImpl() { ...@@ -38,9 +38,7 @@ void BroadcastOpHandle::RunImpl() {
out_var_handles.size(), places_.size(), out_var_handles.size(), places_.size(),
"The number of output should equal to the number of places."); "The number of output should equal to the number of places.");
// Wait input done, this Wait is asynchronous operation platform::Place WaitInputVarGenerated();
// &in_place;
WaitInputVarGenerated(*in_var_handle);
std::vector<const Scope *> var_scopes; std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) { for (auto *s : local_scopes_) {
...@@ -50,29 +48,9 @@ void BroadcastOpHandle::RunImpl() { ...@@ -50,29 +48,9 @@ void BroadcastOpHandle::RunImpl() {
auto *in_var = auto *in_var =
var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_); var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(in_var); PADDLE_ENFORCE_NOT_NULL(in_var);
Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var); Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
// NOTE: The tensors' Place of input and output must be all on GPU or all on InitOutputValue(*in_var_handle, out_var_handles);
// CPU.
for (auto *out_var_handle : out_var_handles) {
if (out_var_handle->IsTheSameVar(*in_var_handle)) {
continue;
}
auto t_out_p = out_var_handle->place_;
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
if (platform::is_gpu_place(in_tensor.place())) {
PADDLE_ENFORCE(platform::is_gpu_place(t_out_p),
"Places of input and output must be all on GPU.");
} else {
t_out_p = platform::CPUPlace();
}
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p,
in_tensor.type());
}
if (platform::is_cpu_place(in_tensor.place())) { if (platform::is_cpu_place(in_tensor.place())) {
for (auto *out_var_handle : out_var_handles) { for (auto *out_var_handle : out_var_handles) {
...@@ -147,11 +125,37 @@ void BroadcastOpHandle::RunImpl() { ...@@ -147,11 +125,37 @@ void BroadcastOpHandle::RunImpl() {
} }
} }
void BroadcastOpHandle::WaitInputVarGenerated(const VarHandle &in_var) { void BroadcastOpHandle::InitOutputValue(
if (in_var.generated_op_) { const VarHandle &in_var_handle,
for (auto &pair : dev_ctxes_) { const std::vector<VarHandle *> &out_var_handles) const {
in_var.generated_op_->Wait(pair.second); std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get<Scope *>());
}
auto *in_var =
var_scopes.at(in_var_handle.scope_idx_)->FindVar(in_var_handle.name_);
Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
// NOTE: The tensors' Place of input and output must be all on GPU or all on
// CPU.
for (auto *out_var_handle : out_var_handles) {
if (out_var_handle->IsTheSameVar(in_var_handle)) {
continue;
}
auto t_out_p = out_var_handle->place_;
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
if (is_gpu_place(in_tensor.place())) {
PADDLE_ENFORCE(platform::is_gpu_place(t_out_p),
"Places of input and output must be all on GPU.");
} else {
t_out_p = platform::CPUPlace();
} }
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p,
in_tensor.type());
} }
} }
......
...@@ -57,7 +57,6 @@ struct BroadcastOpHandle : public OpHandleBase { ...@@ -57,7 +57,6 @@ struct BroadcastOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
void WaitInputVarGenerated(const VarHandle &in_var);
private: private:
const std::vector<Scope *> &local_scopes_; const std::vector<Scope *> &local_scopes_;
...@@ -65,6 +64,9 @@ struct BroadcastOpHandle : public OpHandleBase { ...@@ -65,6 +64,9 @@ struct BroadcastOpHandle : public OpHandleBase {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_; const platform::NCCLContextMap *nccl_ctxs_;
#endif #endif
void InitOutputValue(const VarHandle &in_var_handle,
const std::vector<VarHandle *> &out_var_handles) const;
}; };
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -26,20 +26,20 @@ ComputationOpHandle::ComputationOpHandle(const OpDesc &op_desc, Scope *scope, ...@@ -26,20 +26,20 @@ ComputationOpHandle::ComputationOpHandle(const OpDesc &op_desc, Scope *scope,
place_(place) {} place_(place) {}
void ComputationOpHandle::RunImpl() { void ComputationOpHandle::RunImpl() {
auto *cur_ctx = dev_ctxes_[place_]; WaitInputVarGenerated(place_);
for (auto *in : inputs_) {
bool need_wait = in->generated_op_ &&
in->generated_op_->DeviceContext(place_) != cur_ctx;
if (need_wait) {
in->generated_op_->Wait(cur_ctx);
}
}
this->RunAndRecordEvent([this] { this->RunAndRecordEvent([this] {
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_); op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
}); });
} }
bool ComputationOpHandle::NeedWait(VarHandleBase *in_var) {
bool need_wait =
in_var && in_var->generated_op_ &&
in_var->generated_op_->DeviceContext(place_) != dev_ctxes_[place_];
return need_wait;
}
std::string ComputationOpHandle::Name() const { return op_->Type(); } std::string ComputationOpHandle::Name() const { return op_->Type(); }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -36,6 +36,8 @@ struct ComputationOpHandle : public OpHandleBase { ...@@ -36,6 +36,8 @@ struct ComputationOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
virtual bool NeedWait(VarHandleBase *in_var);
private: private:
std::unique_ptr<OperatorBase> op_; std::unique_ptr<OperatorBase> op_;
Scope *scope_; Scope *scope_;
......
...@@ -31,7 +31,7 @@ FetchOpHandle::~FetchOpHandle() { ...@@ -31,7 +31,7 @@ FetchOpHandle::~FetchOpHandle() {
} }
} }
void FetchOpHandle::Wait(platform::DeviceContext *waited_dev) { void FetchOpHandle::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) {
PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error"); PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
} }
...@@ -45,14 +45,8 @@ void FetchOpHandle::WaitAndMergeCPUTensors() const { ...@@ -45,14 +45,8 @@ void FetchOpHandle::WaitAndMergeCPUTensors() const {
} }
void FetchOpHandle::RunImpl() { void FetchOpHandle::RunImpl() {
auto cpu_ctx = WaitInputVarGenerated(platform::CPUPlace());
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
for (auto *input : inputs_) {
auto *var = static_cast<VarHandle *>(input);
if (var->generated_op_) {
var->generated_op_->Wait(cpu_ctx);
}
}
tensors_.resize(inputs_.size()); tensors_.resize(inputs_.size());
auto *var_handle = static_cast<VarHandle *>(inputs_[0]); auto *var_handle = static_cast<VarHandle *>(inputs_[0]);
auto &var_name = var_handle->name_; auto &var_name = var_handle->name_;
...@@ -79,6 +73,15 @@ void FetchOpHandle::RunImpl() { ...@@ -79,6 +73,15 @@ void FetchOpHandle::RunImpl() {
this->WaitAndMergeCPUTensors(); this->WaitAndMergeCPUTensors();
} }
void FetchOpHandle::WaitInputVarGenerated(const platform::Place &place) {
auto cpu_ctx = platform::DeviceContextPool::Instance().Get(place);
for (auto *input : inputs_) {
if (input->generated_op_) {
input->generated_op_->RecordWaitEventOnCtx(cpu_ctx);
}
}
}
std::string FetchOpHandle::Name() const { return "Fetch"; } std::string FetchOpHandle::Name() const { return "Fetch"; }
} // namespace details } // namespace details
......
...@@ -33,7 +33,7 @@ struct FetchOpHandle : public OpHandleBase { ...@@ -33,7 +33,7 @@ struct FetchOpHandle : public OpHandleBase {
~FetchOpHandle(); ~FetchOpHandle();
void Wait(platform::DeviceContext *waited_dev) override; void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) override;
void WaitAndMergeCPUTensors() const; void WaitAndMergeCPUTensors() const;
...@@ -42,6 +42,8 @@ struct FetchOpHandle : public OpHandleBase { ...@@ -42,6 +42,8 @@ struct FetchOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
virtual void WaitInputVarGenerated(const platform::Place &place);
private: private:
FeedFetchList *data_; FeedFetchList *data_;
size_t offset_; size_t offset_;
......
...@@ -55,7 +55,7 @@ void GatherOpHandle::RunImpl() { ...@@ -55,7 +55,7 @@ void GatherOpHandle::RunImpl() {
"Currently, gather_op only can gather SelectedRows."); "Currently, gather_op only can gather SelectedRows.");
// Wait input done, this Wait is asynchronous operation // Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles); WaitInputVarGenerated();
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>(); auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
std::vector<int64_t> out_rows; std::vector<int64_t> out_rows;
...@@ -111,17 +111,6 @@ void GatherOpHandle::RunImpl() { ...@@ -111,17 +111,6 @@ void GatherOpHandle::RunImpl() {
}); });
} }
void GatherOpHandle::WaitInputVarGenerated(
const std::vector<VarHandle *> &in_var_handles) {
for (auto *in : in_var_handles) {
if (in->generated_op_) {
for (auto pair : dev_ctxes_) {
in->generated_op_->Wait(pair.second);
}
}
}
}
std::string GatherOpHandle::Name() const { return "gather"; } std::string GatherOpHandle::Name() const { return "gather"; }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -39,7 +39,6 @@ struct GatherOpHandle : public OpHandleBase { ...@@ -39,7 +39,6 @@ struct GatherOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
void WaitInputVarGenerated(const std::vector<VarHandle *> &in_var_handles);
private: private:
const std::vector<Scope *> &local_scopes_; const std::vector<Scope *> &local_scopes_;
......
...@@ -34,12 +34,7 @@ void NCCLAllReduceOpHandle::RunImpl() { ...@@ -34,12 +34,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
return; // No need to all reduce when GPU count = 1; return; // No need to all reduce when GPU count = 1;
} else { } else {
// Wait input done // Wait input done
for (auto *in : inputs_) { WaitInputVarGenerated();
auto &p = static_cast<VarHandle *>(in)->place_;
if (in->generated_op_) {
in->generated_op_->Wait(dev_ctxes_[p]);
}
}
auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_; auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;
int dtype = -1; int dtype = -1;
......
...@@ -56,15 +56,15 @@ void OpHandleBase::Run(bool use_event) { ...@@ -56,15 +56,15 @@ void OpHandleBase::Run(bool use_event) {
RunImpl(); RunImpl();
} }
void OpHandleBase::Wait(platform::DeviceContext *waited_dev) { void OpHandleBase::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (platform::is_cpu_place(waited_dev->GetPlace()) || events_.empty()) { if (platform::is_cpu_place(waited_ctx->GetPlace()) || events_.empty()) {
for (auto &dev_ctx : dev_ctxes_) { for (auto &dev_ctx : dev_ctxes_) {
dev_ctx.second->Wait(); dev_ctx.second->Wait();
} }
} else { } else {
auto stream = auto stream =
static_cast<platform::CUDADeviceContext *>(waited_dev)->stream(); static_cast<platform::CUDADeviceContext *>(waited_ctx)->stream();
for (auto &ev : events_) { for (auto &ev : events_) {
PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0)); PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0));
} }
...@@ -86,6 +86,28 @@ void OpHandleBase::AddOutput(VarHandleBase *out) { ...@@ -86,6 +86,28 @@ void OpHandleBase::AddOutput(VarHandleBase *out) {
out->generated_op_ = this; out->generated_op_ = this;
} }
void OpHandleBase::WaitInputVarGenerated() {
for (auto in_var : inputs_) {
if (NeedWait(in_var)) {
for (auto &pair : dev_ctxes_) {
in_var->generated_op_->RecordWaitEventOnCtx(pair.second);
}
}
}
}
void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) {
for (auto *in : inputs_) {
if (NeedWait(in)) {
in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[place]);
}
}
}
bool OpHandleBase::NeedWait(VarHandleBase *in_var) {
return in_var && in_var->generated_op_;
}
void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) { void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (!events_.empty()) { // Use event if (!events_.empty()) { // Use event
......
...@@ -38,12 +38,24 @@ class OpHandleBase { ...@@ -38,12 +38,24 @@ class OpHandleBase {
void Run(bool use_event); void Run(bool use_event);
virtual void Wait(platform::DeviceContext *waited_dev); virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx);
void AddInput(VarHandleBase *in); void AddInput(VarHandleBase *in);
void AddOutput(VarHandleBase *out); void AddOutput(VarHandleBase *out);
// This method adds the wait events of all the input on all the device
// context.
// NODE: This Wait is asynchronous operation.
virtual void WaitInputVarGenerated();
// This method adds the wait events of all the input on the specified device
// context.
// NODE: This Wait is asynchronous operation.
virtual void WaitInputVarGenerated(const platform::Place &place);
virtual bool NeedWait(VarHandleBase *in_var);
// If the Op involves data transfer of multiple devices that // If the Op involves data transfer of multiple devices that
// will likely block other computations. // will likely block other computations.
virtual bool IsMultiDeviceTransfer() { return false; } virtual bool IsMultiDeviceTransfer() { return false; }
......
...@@ -51,7 +51,7 @@ void ReduceOpHandle::RunImpl() { ...@@ -51,7 +51,7 @@ void ReduceOpHandle::RunImpl() {
PADDLE_ENFORCE_NOT_NULL(pre_in_var); PADDLE_ENFORCE_NOT_NULL(pre_in_var);
// Wait input done, this Wait is asynchronous operation // Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles); WaitInputVarGenerated();
// NOTE: The Places of all input tensor must be all on CPU or all on GPU. // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
std::vector<platform::Place> in_places; // used to get dev_ctx std::vector<platform::Place> in_places; // used to get dev_ctx
...@@ -80,19 +80,21 @@ void ReduceOpHandle::RunImpl() { ...@@ -80,19 +80,21 @@ void ReduceOpHandle::RunImpl() {
} }
if (pre_in_var->IsType<framework::SelectedRows>()) { if (pre_in_var->IsType<framework::SelectedRows>()) {
this->RunAndRecordEvent([&] {
std::vector<const SelectedRows *> in_selected_rows = std::vector<const SelectedRows *> in_selected_rows =
GetInputValues<SelectedRows>(in_var_handles, var_scopes); GetInputValues<SelectedRows>(in_var_handles, var_scopes);
GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p, GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p,
out_var->GetMutable<framework::SelectedRows>()); out_var->GetMutable<framework::SelectedRows>());
});
} else { } else {
std::vector<const LoDTensor *> lod_tensors = std::vector<const LoDTensor *> lod_tensors =
GetInputValues<LoDTensor>(in_var_handles, var_scopes); GetInputValues<LoDTensor>(in_var_handles, var_scopes);
if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) { if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
this->RunAndRecordEvent([&] {
ReduceLoDTensor func(lod_tensors, ReduceLoDTensor func(lod_tensors,
out_var->GetMutable<framework::LoDTensor>()); out_var->GetMutable<framework::LoDTensor>());
VisitDataType(ToDataType(lod_tensors[0]->type()), func); VisitDataType(ToDataType(lod_tensors[0]->type()), func);
});
} else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) { } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
auto pre_in = pre_in_var->Get<framework::LoDTensor>(); auto pre_in = pre_in_var->Get<framework::LoDTensor>();
...@@ -157,17 +159,6 @@ std::vector<const T *> ReduceOpHandle::GetInputValues( ...@@ -157,17 +159,6 @@ std::vector<const T *> ReduceOpHandle::GetInputValues(
return in_selected_rows; return in_selected_rows;
} }
void ReduceOpHandle::WaitInputVarGenerated(
const std::vector<VarHandle *> &in_var_handles) {
for (auto *in : in_var_handles) {
if (in->generated_op_) {
for (auto pair : dev_ctxes_) {
in->generated_op_->Wait(pair.second);
}
}
}
}
std::string ReduceOpHandle::Name() const { return "reduce"; } std::string ReduceOpHandle::Name() const { return "reduce"; }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -60,8 +60,6 @@ struct ReduceOpHandle : public OpHandleBase { ...@@ -60,8 +60,6 @@ struct ReduceOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
void WaitInputVarGenerated(const std::vector<VarHandle *> &in_var_handles);
template <typename T> template <typename T>
std::vector<const T *> GetInputValues( std::vector<const T *> GetInputValues(
const std::vector<VarHandle *> &in_var_handles, const std::vector<VarHandle *> &in_var_handles,
......
...@@ -29,6 +29,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(size_t num_dev, Scope *scope, ...@@ -29,6 +29,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(size_t num_dev, Scope *scope,
ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {} ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {}
void ScaleLossGradOpHandle::RunImpl() { void ScaleLossGradOpHandle::RunImpl() {
// Doesn't wait any event
std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_; std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_;
auto &local_scope = *scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(); auto &local_scope = *scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
......
...@@ -26,6 +26,7 @@ SendOpHandle::SendOpHandle(const framework::OpDesc &op_desc, ...@@ -26,6 +26,7 @@ SendOpHandle::SendOpHandle(const framework::OpDesc &op_desc,
place_(place) {} place_(place) {}
void SendOpHandle::RunImpl() { void SendOpHandle::RunImpl() {
// TODO(wuyi): need further analysis whether wait VarDummyHandle.
// Wait input done // Wait input done
for (auto *in : inputs_) { for (auto *in : inputs_) {
auto &p = static_cast<VarHandle *>(in)->place_; auto &p = static_cast<VarHandle *>(in)->place_;
...@@ -33,7 +34,7 @@ void SendOpHandle::RunImpl() { ...@@ -33,7 +34,7 @@ void SendOpHandle::RunImpl() {
continue; continue;
} }
if (in->generated_op_) { if (in->generated_op_) {
in->generated_op_->Wait(dev_ctxes_[p]); in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[p]);
} }
} }
auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(); auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
...@@ -45,73 +43,33 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -45,73 +43,33 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// Should revisit it if overlapping is available. // Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops; std::unordered_set<OpHandleBase *> delayed_ops;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
if (var.generated_op_ == nullptr) {
ready_vars.Push(&var);
}
};
auto InsertPendingOp = [&pending_ops](OpHandleBase &op_instance) {
pending_ops.insert({&op_instance, op_instance.Inputs().size()});
};
// Transform SSAGraph to pending_ops & pending_vars // Transform SSAGraph to pending_ops & pending_vars
for (auto &var_map : graph_->vars_) { for (auto &var_map : graph_->vars_) {
for (auto &name_pair : var_map) { for (auto &name_pair : var_map) {
for (auto &version_pair : name_pair.second) { for (auto &version_pair : name_pair.second) {
InsertPendingVar(*version_pair); InsertPendingVar(&pending_vars, &ready_vars, version_pair.get());
} }
} }
} }
for (auto &var : graph_->dep_vars_) { for (auto &var : graph_->dep_vars_) {
InsertPendingVar(*var); InsertPendingVar(&pending_vars, &ready_vars, var.get());
} }
for (auto &op : graph_->ops_) { for (auto &op : graph_->ops_) {
if (op->Inputs().empty()) { // Special case, Op has no input. if (op->Inputs().empty()) { // Special case, Op has no input.
ready_ops.insert(op.get()); ready_ops.insert(op.get());
} else { } else {
InsertPendingOp(*op); InsertPendingOp(&pending_ops, op.get());
} }
} }
// Step 2. Insert FetchOps // Step 2. Insert FetchOps
std::vector<std::unique_ptr<FetchOpHandle>> fetch_ops; std::vector<std::unique_ptr<FetchOpHandle>> fetch_ops;
FeedFetchList fetch_data(fetch_tensors.size());
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &var_map : graph_->vars_) {
auto it = var_map.find(fetch_var_name);
if (it != var_map.end()) {
fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get());
}
}
}
std::unordered_set<std::unique_ptr<VarHandleBase>> fetch_dependencies; std::unordered_set<std::unique_ptr<VarHandleBase>> fetch_dependencies;
for (size_t i = 0; i < fetch_tensors.size(); ++i) { FeedFetchList fetch_data(fetch_tensors.size());
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto *op = new FetchOpHandle(&fetch_data, i, &local_scopes_);
fetch_ops.emplace_back(op);
for (auto &p : places_) {
op->SetDeviceContext(p, fetch_ctxs_.Get(p));
}
for (auto *var : vars) {
op->AddInput(var);
}
auto *fetch_dummy = new DummyVarHandle(); InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops,
op->AddOutput(fetch_dummy); &pending_vars, &ready_vars, &fetch_data);
fetch_dependencies.emplace(fetch_dummy);
InsertPendingVar(*fetch_dummy);
InsertPendingOp(*op);
}
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) { auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) { for (auto *op : set) {
...@@ -174,6 +132,60 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -174,6 +132,60 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
return fetch_data; return fetch_data;
} }
void ThreadedSSAGraphExecutor::InsertFetchOps(
const std::vector<std::string> &fetch_tensors,
std::vector<std::unique_ptr<FetchOpHandle>> *fetch_ops,
std::unordered_set<std::unique_ptr<VarHandleBase>> *fetch_dependencies,
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, FeedFetchList *fetch_data) {
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &var_map : graph_->vars_) {
auto it = var_map.find(fetch_var_name);
if (it != var_map.end()) {
fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get());
}
}
}
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_);
fetch_ops->emplace_back(op);
for (auto &p : places_) {
op->SetDeviceContext(p, fetch_ctxs_.Get(p));
}
for (auto *var : vars) {
op->AddInput(var);
}
auto *fetch_dummy = new DummyVarHandle();
op->AddOutput(fetch_dummy);
fetch_dependencies->emplace(fetch_dummy);
this->InsertPendingVar(pending_vars, ready_vars, fetch_dummy);
this->InsertPendingOp(pending_ops, op);
}
}
void ThreadedSSAGraphExecutor::InsertPendingOp(
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
OpHandleBase *op_instance) const {
pending_ops->insert({op_instance, op_instance->Inputs().size()});
}
void ThreadedSSAGraphExecutor::InsertPendingVar(
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, VarHandleBase *var) const {
pending_vars->insert(var);
if (var->generated_op_ == nullptr) {
ready_vars->Push(var);
}
}
void ThreadedSSAGraphExecutor::RunOp( void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) { BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] { auto op_run = [ready_var_q, op, this] {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <functional> #include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party #include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h"
#include "paddle/fluid/framework/details/ssa_graph_executor.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h"
namespace paddle { namespace paddle {
...@@ -58,6 +59,21 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -58,6 +59,21 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::unique_ptr<platform::EnforceNotMet> exception_; std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_; std::atomic<int> running_ops_;
bool allow_op_delay_; bool allow_op_delay_;
void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops,
OpHandleBase *op_instance) const;
void InsertPendingVar(std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars,
VarHandleBase *var) const;
void InsertFetchOps(
const std::vector<std::string> &fetch_tensors,
std::vector<std::unique_ptr<FetchOpHandle>> *fetch_ops,
std::unordered_set<std::unique_ptr<VarHandleBase>> *fetch_dependencies,
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, FeedFetchList *fetch_data);
}; };
} // namespace details } // namespace details
......
...@@ -29,60 +29,26 @@ namespace paddle { ...@@ -29,60 +29,26 @@ namespace paddle {
namespace operators { namespace operators {
namespace detail { namespace detail {
void SerializeToByteBuffer(const std::string& name, framework::Variable* var, using VarMsg = sendrecv::VariableMessage;
const platform::DeviceContext& ctx,
::grpc::ByteBuffer* msg,
const std::string& out_name) {
using VarMsg = sendrecv::VariableMessage;
// When using GPU, need to free the copied CPU buffer
// when the ByteBuffer destroies
// TODO(typhoonzero): add unref here, if we have dependent
// parallelism execution, need to know when to free the tensor.
DestroyCallback destroy_callback = [](void* backing) {};
auto buffer = std::unique_ptr<char[]>(new char[1024]); void GetTensorPayload(framework::Variable* var,
void* buf = buffer.get(); const platform::DeviceContext& ctx, VarMsg* request,
void** payload, size_t* payload_size) {
void* payload = nullptr;
size_t payload_size;
ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
// Note: normally the profiler is enabled in 1 trainer, hence only
// 1 trainer returns true for ShouldSendProfileState(). It tells PS
// servers the trainer's profiling state so that PS can follow the
// trainer.
if (platform::ShouldSendProfileState()) {
e.WriteBool(VarMsg::kProfileFieldNumber, platform::IsProfileEnabled());
}
e.WriteString(VarMsg::kVarnameFieldNumber, name);
if (var->IsType<framework::LoDTensor>()) {
e.WriteUint64(VarMsg::kTypeFieldNumber, 0);
} else if (var->IsType<framework::SelectedRows>()) {
e.WriteUint64(VarMsg::kTypeFieldNumber, 1);
}
if (!out_name.empty()) {
e.WriteString(VarMsg::kOutVarnameFieldNumber, out_name);
}
switch (framework::ToVarType(var->Type())) {
case framework::proto::VarType_Type_LOD_TENSOR: {
auto tensor = var->Get<framework::LoDTensor>(); auto tensor = var->Get<framework::LoDTensor>();
e.WriteUint64(VarMsg::kDataTypeFieldNumber, // FIXME(wuyi): data types in send_recv.proto is copied from
framework::ToDataType(tensor.type())); // framework.proto
request->set_data_type(
static_cast<VarMsg::Type>(framework::ToDataType(tensor.type())));
for (auto& dim : framework::vectorize(tensor.dims())) { for (auto& dim : framework::vectorize(tensor.dims())) {
e.WriteUint64(VarMsg::kDimsFieldNumber, dim); request->add_dims(dim);
} }
auto lod = tensor.lod(); // std::vector<Vector<size_t>> const framework::LoD lod = tensor.lod();
if (lod.size() > 0) { if (lod.size() > 0) {
e.WriteUint64(VarMsg::kLodLevelFieldNumber, lod.size()); request->set_lod_level(lod.size());
for (auto& each : lod) { for (auto& each : lod) {
e.WriteVarlengthBeginning(VarMsg::kLodFieldNumber, VarMsg::LodData* lod_inner = request->add_lod();
2 + // tag + varintlength of submessage
1 + // kLodDataFieldNumber
each.size());
// auto copied from GPU
for (auto& d : each) { for (auto& d : each) {
e.WriteUint64(VarMsg::LodData::kLodDataFieldNumber, d); lod_inner->add_lod_data(d);
} }
} }
} }
...@@ -90,68 +56,100 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -90,68 +56,100 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE(platform::is_gpu_place(tensor.place())); PADDLE_ENFORCE(platform::is_gpu_place(tensor.place()));
platform::CPUPlace cpu; platform::CPUPlace cpu;
auto& gpu_dev_ctx = auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
static_cast<const platform::CUDADeviceContext&>(ctx);
auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type());
payload = memory::Alloc(cpu, copy_size); *payload = memory::Alloc(cpu, copy_size);
memory::Copy(cpu, payload, memory::Copy(cpu, *payload, boost::get<platform::CUDAPlace>(tensor.place()),
boost::get<platform::CUDAPlace>(tensor.place()), reinterpret_cast<const void*>(tensor.data<void>()), copy_size,
reinterpret_cast<const void*>(tensor.data<void>()), gpu_dev_ctx.stream());
copy_size, gpu_dev_ctx.stream());
ctx.Wait(); ctx.Wait();
destroy_callback = [](void* backing) {
platform::CPUPlace cpu;
memory::Free(cpu, backing);
};
#endif #endif
} else { } else {
payload = tensor.data<void>(); *payload = tensor.data<void>();
} }
payload_size = tensor.numel() * framework::SizeOfType(tensor.type()); *payload_size = tensor.numel() * framework::SizeOfType(tensor.type());
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); }
} break;
case framework::proto::VarType_Type_SELECTED_ROWS: { void GetSelectedRowsPayload(framework::Variable* var,
// TODO(typhoonzero): selectedrows implement should not use unique_ptr const platform::DeviceContext& ctx, VarMsg* request,
void** payload, size_t* payload_size) {
auto* slr = var->GetMutable<framework::SelectedRows>(); auto* slr = var->GetMutable<framework::SelectedRows>();
e.WriteUint64(VarMsg::kDataTypeFieldNumber, request->set_data_type(
framework::ToDataType(slr->value().type())); static_cast<VarMsg::Type>(framework::ToDataType(slr->value().type())));
request->set_lod_level(0);
request->set_slr_height(slr->height());
for (auto& dim : framework::vectorize(slr->value().dims())) { for (auto& dim : framework::vectorize(slr->value().dims())) {
e.WriteUint64(VarMsg::kDimsFieldNumber, dim); request->add_dims(dim);
} }
e.WriteUint64(VarMsg::kLodLevelFieldNumber, 0);
e.WriteUint64(VarMsg::kSlrHeightFieldNumber, slr->height());
auto* tensor = slr->mutable_value(); auto* tensor = slr->mutable_value();
if (platform::is_gpu_place(ctx.GetPlace())) { if (platform::is_gpu_place(ctx.GetPlace())) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
platform::CPUPlace cpu; platform::CPUPlace cpu;
auto& gpu_dev_ctx = auto& gpu_dev_ctx = static_cast<const platform::CUDADeviceContext&>(ctx);
static_cast<const platform::CUDADeviceContext&>(ctx); auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type());
auto copy_size = *payload = memory::Alloc(cpu, copy_size);
tensor->numel() * framework::SizeOfType(tensor->type()); memory::Copy(cpu, *payload,
payload = memory::Alloc(cpu, copy_size);
memory::Copy(cpu, payload,
boost::get<platform::CUDAPlace>(tensor->place()), boost::get<platform::CUDAPlace>(tensor->place()),
reinterpret_cast<const void*>(tensor->data<void>()), reinterpret_cast<const void*>(tensor->data<void>()), copy_size,
copy_size, gpu_dev_ctx.stream()); gpu_dev_ctx.stream());
ctx.Wait(); ctx.Wait();
destroy_callback = [](void* backing) {
platform::CPUPlace cpu;
memory::Free(cpu, backing);
};
#endif #endif
} else { } else {
payload = slr->mutable_value()->data<void>(); *payload = slr->mutable_value()->data<void>();
} }
payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); *payload_size = tensor->numel() * framework::SizeOfType(tensor->type());
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); }
} break;
default: void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
const platform::DeviceContext& ctx,
::grpc::ByteBuffer* msg,
const std::string& out_name) {
// Default DestroyCallback does nothing, When using GPU
// the CPU buffer need to be freed.
DestroyCallback destroy_callback = [](void* backing) {};
VarMsg request;
void* payload = nullptr;
size_t payload_size;
request.set_varname(name);
// Note: normally the profiler is enabled in 1 trainer, hence only
// 1 trainer returns true for ShouldSendProfileState(). It tells PS
// servers the trainer's profiling state so that PS can follow the
// trainer.
request.set_profile(platform::IsProfileEnabled());
if (!out_name.empty()) {
request.set_out_varname(out_name);
}
if (var->IsType<framework::LoDTensor>()) {
request.set_type(::sendrecv::LOD_TENSOR);
GetTensorPayload(var, ctx, &request, &payload, &payload_size);
} else if (var->IsType<framework::SelectedRows>()) {
request.set_type(::sendrecv::SELECTED_ROWS);
GetSelectedRowsPayload(var, ctx, &request, &payload, &payload_size);
} else {
PADDLE_THROW("Serialize does not support type: %s", PADDLE_THROW("Serialize does not support type: %s",
typeid(var->Type()).name()); typeid(var->Type()).name());
break;
} }
if (platform::is_gpu_place(ctx.GetPlace())) {
// GPU data is copied to CPU buffer when sending,
// free the buffer when possible.
destroy_callback = [](void* backing) {
platform::CPUPlace cpu;
memory::Free(cpu, backing);
};
}
std::string header;
request.AppendToString(&header);
auto buffer = std::unique_ptr<char[]>(new char[1024]);
void* buf = buffer.get();
ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
e.WriteRawBytes(std::string(header.data(), header.size()));
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size);
// steal reference of tensor data // steal reference of tensor data
::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows
int num_slices = 2; // only SelectedRows have rows buffer int num_slices = 2; // only SelectedRows have rows buffer
...@@ -162,12 +160,9 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -162,12 +160,9 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
static_cast<char*>(payload)), static_cast<char*>(payload)),
::grpc::Slice::STEAL_REF); ::grpc::Slice::STEAL_REF);
if (framework::ToVarType(var->Type()) == if (var->IsType<framework::SelectedRows>()) {
framework::proto::VarType_Type_SELECTED_ROWS) {
auto* slr = var->GetMutable<framework::SelectedRows>(); auto* slr = var->GetMutable<framework::SelectedRows>();
ProtoEncodeHelper e2(static_cast<char*>(buf), 128); ProtoEncodeHelper e2(static_cast<char*>(buf), 128);
// NOTE: rows is of type int64_t
size_t rows_memory_size = size_t rows_memory_size =
slr->rows().size() * framework::SizeOfType(typeid(int64_t)); slr->rows().size() * framework::SizeOfType(typeid(int64_t));
e2.WriteVarlengthBeginning(VarMsg::kRowsFieldNumber, rows_memory_size); e2.WriteVarlengthBeginning(VarMsg::kRowsFieldNumber, rows_memory_size);
...@@ -178,10 +173,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -178,10 +173,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
grpc_slice_new_with_user_data( grpc_slice_new_with_user_data(
const_cast<void*>( const_cast<void*>(
reinterpret_cast<const void*>(slr->rows().data())), reinterpret_cast<const void*>(slr->rows().data())),
rows_memory_size, rows_memory_size, [](void* backing) {},
[](void* backing) {
// TODO(typhoonzero): add unref here, same as above.
},
const_cast<char*>( const_cast<char*>(
reinterpret_cast<const char*>(slr->rows().data()))), reinterpret_cast<const char*>(slr->rows().data()))),
::grpc::Slice::STEAL_REF); ::grpc::Slice::STEAL_REF);
......
...@@ -117,11 +117,11 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { ...@@ -117,11 +117,11 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
// serialize var to ByteBuffer // serialize var to ByteBuffer
framework::Variable var; framework::Variable var;
auto* tensor = var.GetMutable<framework::LoDTensor>(); auto* tensor = var.GetMutable<framework::LoDTensor>();
tensor->Resize(framework::make_ddim({4, 8, 4, 2})); tensor->Resize(framework::make_ddim({512, 8, 4, 2}));
framework::LoD lod; framework::LoD lod;
lod.push_back(framework::Vector<size_t>({1, 3, 8})); lod.push_back(framework::Vector<size_t>({1, 3, 8}));
tensor->set_lod(lod); tensor->set_lod(lod);
int tensor_numel = 4 * 8 * 4 * 2; int tensor_numel = 512 * 8 * 4 * 2;
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place); auto& ctx = *pool.Get(place);
tensor->mutable_data<float>(place); tensor->mutable_data<float>(place);
...@@ -142,7 +142,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { ...@@ -142,7 +142,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
EXPECT_TRUE(varmsg.ParseFromString(tmp)); EXPECT_TRUE(varmsg.ParseFromString(tmp));
EXPECT_EQ(varmsg.varname(), "myvar"); EXPECT_EQ(varmsg.varname(), "myvar");
EXPECT_EQ(varmsg.type(), 0); EXPECT_EQ(varmsg.type(), 0);
EXPECT_EQ(varmsg.dims()[0], 4); EXPECT_EQ(varmsg.dims()[0], 512);
EXPECT_EQ(varmsg.dims()[1], 8); EXPECT_EQ(varmsg.dims()[1], 8);
EXPECT_EQ(varmsg.dims()[2], 4); EXPECT_EQ(varmsg.dims()[2], 4);
EXPECT_EQ(varmsg.dims()[3], 2); EXPECT_EQ(varmsg.dims()[3], 2);
......
...@@ -210,15 +210,15 @@ bool ParseLodData(::google::protobuf::io::CodedInputStream* input, ...@@ -210,15 +210,15 @@ bool ParseLodData(::google::protobuf::io::CodedInputStream* input,
} }
if (wt == WIRETYPE_LENGTH_DELIMITED) { if (wt == WIRETYPE_LENGTH_DELIMITED) {
int length = 0; int num_bytes = 0;
if (!input->ReadVarintSizeAsInt(&length)) { if (!input->ReadVarintSizeAsInt(&num_bytes)) {
return tag; return tag;
} }
int start_pos = input->CurrentPosition();
for (int i = 0; i < length; i++) { while (input->CurrentPosition() - start_pos < num_bytes) {
uint64_t v; uint64_t v;
if (!input->ReadVarint64(&v)) { if (!input->ReadVarint64(&v)) {
return false; return tag;
} }
lod->push_back(v); lod->push_back(v);
} }
...@@ -275,8 +275,8 @@ int VariableResponse::Parse(Source* source) { ...@@ -275,8 +275,8 @@ int VariableResponse::Parse(Source* source) {
break; break;
} }
case sendrecv::VariableMessage::kTypeFieldNumber: { case sendrecv::VariableMessage::kTypeFieldNumber: {
uint64_t v; uint32_t v;
if ((wt != WIRETYPE_VARINT) || !input.ReadVarint64(&v)) { if ((wt != WIRETYPE_VARINT) || !input.ReadVarint32(&v)) {
return tag; return tag;
} }
...@@ -284,8 +284,8 @@ int VariableResponse::Parse(Source* source) { ...@@ -284,8 +284,8 @@ int VariableResponse::Parse(Source* source) {
break; break;
} }
case sendrecv::VariableMessage::kDataTypeFieldNumber: { case sendrecv::VariableMessage::kDataTypeFieldNumber: {
uint64_t v = 0; uint32_t v = 0;
if ((wt != WIRETYPE_VARINT) || !input.ReadVarint64(&v)) { if ((wt != WIRETYPE_VARINT) || !input.ReadVarint32(&v)) {
return tag; return tag;
} }
...@@ -305,11 +305,12 @@ int VariableResponse::Parse(Source* source) { ...@@ -305,11 +305,12 @@ int VariableResponse::Parse(Source* source) {
// packed // packed
if (wt == WIRETYPE_LENGTH_DELIMITED) { if (wt == WIRETYPE_LENGTH_DELIMITED) {
int length = 0; int num_bytes = 0;
if (!input.ReadVarintSizeAsInt(&length)) { if (!input.ReadVarintSizeAsInt(&num_bytes)) {
return tag; return tag;
} }
for (int i = 0; i < length; i++) { int start_pos = input.CurrentPosition();
while (input.CurrentPosition() - start_pos < num_bytes) {
uint64_t v; uint64_t v;
if (!input.ReadVarint64(&v)) { if (!input.ReadVarint64(&v)) {
return tag; return tag;
...@@ -318,7 +319,6 @@ int VariableResponse::Parse(Source* source) { ...@@ -318,7 +319,6 @@ int VariableResponse::Parse(Source* source) {
} }
break; break;
} }
return tag; return tag;
} }
case sendrecv::VariableMessage::kLodLevelFieldNumber: { case sendrecv::VariableMessage::kLodLevelFieldNumber: {
...@@ -372,9 +372,9 @@ int VariableResponse::Parse(Source* source) { ...@@ -372,9 +372,9 @@ int VariableResponse::Parse(Source* source) {
meta_.varname() != "", meta_.varname() != "",
"meta info should be got first!"); "meta info should be got first!");
int length = 0; int num_bytes = 0;
if (wt != WIRETYPE_LENGTH_DELIMITED || if (wt != WIRETYPE_LENGTH_DELIMITED ||
!ReadVarintSizeAsInt(&input, &length)) { !ReadVarintSizeAsInt(&input, &num_bytes)) {
return tag; return tag;
} }
...@@ -382,14 +382,14 @@ int VariableResponse::Parse(Source* source) { ...@@ -382,14 +382,14 @@ int VariableResponse::Parse(Source* source) {
if (meta_.type() == sendrecv::LOD_TENSOR) { if (meta_.type() == sendrecv::LOD_TENSOR) {
PADDLE_ENFORCE(meta_.lod_size() >= 0, PADDLE_ENFORCE(meta_.lod_size() >= 0,
"lod info should be got first!"); "lod info should be got first!");
if (!CopyLodTensorData(&input, *dev_ctx_, dims, length)) { if (!CopyLodTensorData(&input, *dev_ctx_, dims, num_bytes)) {
return tag; return tag;
} }
break; break;
} }
if (meta_.type() == sendrecv::SELECTED_ROWS) { if (meta_.type() == sendrecv::SELECTED_ROWS) {
if (!CopySelectRowsTensorData(&input, *dev_ctx_, dims, length)) { if (!CopySelectRowsTensorData(&input, *dev_ctx_, dims, num_bytes)) {
return tag; return tag;
} }
break; break;
...@@ -403,13 +403,13 @@ int VariableResponse::Parse(Source* source) { ...@@ -403,13 +403,13 @@ int VariableResponse::Parse(Source* source) {
meta_.varname() != "", meta_.varname() != "",
"meta info should be got first!"); "meta info should be got first!");
int length = 0; int num_bytes = 0;
if (wt != WIRETYPE_LENGTH_DELIMITED || if (wt != WIRETYPE_LENGTH_DELIMITED ||
!ReadVarintSizeAsInt(&input, &length)) { !ReadVarintSizeAsInt(&input, &num_bytes)) {
return tag; return tag;
} }
if (!CopySelectRowsData(&input, *dev_ctx_, length)) { if (!CopySelectRowsData(&input, *dev_ctx_, num_bytes)) {
return tag; return tag;
} }
break; break;
......
...@@ -18,6 +18,7 @@ limitations under the License. */ ...@@ -18,6 +18,7 @@ limitations under the License. */
#include <numeric> #include <numeric>
#include <sstream> #include <sstream>
#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/data_type_transform.h"
#include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
...@@ -69,6 +70,7 @@ class SaveCombineOp : public framework::OperatorBase { ...@@ -69,6 +70,7 @@ class SaveCombineOp : public framework::OperatorBase {
const platform::Place &place) const override { const platform::Place &place) const override {
auto filename = Attr<std::string>("file_path"); auto filename = Attr<std::string>("file_path");
auto overwrite = Attr<bool>("overwrite"); auto overwrite = Attr<bool>("overwrite");
auto save_as_fp16 = Attr<bool>("save_as_fp16");
bool is_present = FileExists(filename); bool is_present = FileExists(filename);
if (is_present && !overwrite) { if (is_present && !overwrite) {
...@@ -100,9 +102,25 @@ class SaveCombineOp : public framework::OperatorBase { ...@@ -100,9 +102,25 @@ class SaveCombineOp : public framework::OperatorBase {
inp_var_names[i]); inp_var_names[i]);
auto &tensor = var->Get<framework::LoDTensor>(); auto &tensor = var->Get<framework::LoDTensor>();
// Serialize tensor // Serialize tensors one by one
// Check types to see if a fp16 transformation is required
auto in_dtype = framework::ToDataType(tensor.type());
auto out_dtype =
save_as_fp16 ? framework::proto::VarType::FP16 : in_dtype;
if (in_dtype != out_dtype) {
auto in_kernel_type = framework::OpKernelType(in_dtype, place);
auto out_kernel_type = framework::OpKernelType(out_dtype, place);
framework::LoDTensor out;
// copy LoD info to the new tensor
out.set_lod(tensor.lod());
framework::TransDataType(in_kernel_type, out_kernel_type, tensor, &out);
framework::SerializeToStream(fout, out, dev_ctx);
} else {
framework::SerializeToStream(fout, tensor, dev_ctx); framework::SerializeToStream(fout, tensor, dev_ctx);
} }
}
fout.close(); fout.close();
} }
}; };
...@@ -125,6 +143,12 @@ to a file on disk. ...@@ -125,6 +143,12 @@ to a file on disk.
"(boolean, default true)" "(boolean, default true)"
"Overwrite the output file if it exists.") "Overwrite the output file if it exists.")
.SetDefault(true); .SetDefault(true);
AddAttr<bool>("save_as_fp16",
"(boolean, default false)"
"If true, the tensor will be converted to float16 data "
"type and then saved. Otherwise, the tensor will be "
"directly saved without data type conversion.")
.SetDefault(false);
AddAttr<std::string>( AddAttr<std::string>(
"file_path", "file_path",
"(string)" "(string)"
......
...@@ -17,11 +17,13 @@ limitations under the License. */ ...@@ -17,11 +17,13 @@ limitations under the License. */
#include <vector> #include <vector>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/platform/float16.h"
USE_NO_KERNEL_OP(save_combine); USE_NO_KERNEL_OP(save_combine);
USE_NO_KERNEL_OP(load_combine); USE_NO_KERNEL_OP(load_combine);
int* CreateForSaveCombineOp(int x, int y, const std::vector<int>& lod_info, template <typename T, typename U>
T* CreateForSaveCombineOp(int x, int y, const std::vector<int>& lod_info,
std::string var_name, std::string var_name,
const paddle::platform::CPUPlace& place, const paddle::platform::CPUPlace& place,
paddle::framework::Scope* scope, paddle::framework::Scope* scope,
...@@ -34,9 +36,10 @@ int* CreateForSaveCombineOp(int x, int y, const std::vector<int>& lod_info, ...@@ -34,9 +36,10 @@ int* CreateForSaveCombineOp(int x, int y, const std::vector<int>& lod_info,
(*expect_lod)[0].push_back(lod_info[i]); (*expect_lod)[0].push_back(lod_info[i]);
} }
tensor->set_lod(*expect_lod); tensor->set_lod(*expect_lod);
int* expect = tensor->mutable_data<int>(place); T* expect = tensor->mutable_data<T>(place);
for (int64_t i = 0; i < tensor->numel(); ++i) { for (int64_t i = 0; i < tensor->numel(); ++i) {
expect[i] = static_cast<int>(i); expect[i] = static_cast<T>(
static_cast<U>(i)); // For FP16, we intend to do float(float16(i))
} }
return expect; return expect;
} }
...@@ -48,18 +51,20 @@ paddle::framework::LoDTensor* GeneratePlaceholderBeforeLoad( ...@@ -48,18 +51,20 @@ paddle::framework::LoDTensor* GeneratePlaceholderBeforeLoad(
return target; return target;
} }
int* GetValuesAfterLoadCombineOp(paddle::framework::LoDTensor* target, template <typename T>
T* GetValuesAfterLoadCombineOp(paddle::framework::LoDTensor* target,
const paddle::framework::Scope& scope, const paddle::framework::Scope& scope,
paddle::framework::LoD* actual_lod) { paddle::framework::LoD* actual_lod) {
int* actual = target->data<int>(); T* actual = target->data<T>();
*actual_lod = target->lod(); *actual_lod = target->lod();
return actual; return actual;
} }
void CheckValues(int* expect, int* actual, paddle::framework::LoD expect_lod, template <typename T, typename U>
paddle::framework::LoD actual_lod, const int& numel) { void CheckValues(T* expect, U* actual, const paddle::framework::LoD& expect_lod,
for (int64_t i = 0; i < numel; ++i) { const paddle::framework::LoD& actual_lod, const int& numel) {
EXPECT_EQ(expect[i], actual[i]); for (int i = 0; i < numel; ++i) {
EXPECT_EQ(expect[i], static_cast<T>(actual[i]));
} }
EXPECT_EQ(expect_lod.size(), actual_lod.size()); EXPECT_EQ(expect_lod.size(), actual_lod.size());
for (size_t i = 0; i < expect_lod.size(); ++i) { for (size_t i = 0; i < expect_lod.size(); ++i) {
...@@ -78,26 +83,26 @@ TEST(SaveLoadCombineOp, CPU) { ...@@ -78,26 +83,26 @@ TEST(SaveLoadCombineOp, CPU) {
std::vector<int> lod1 = {0, 1, 2, 3, 10}; std::vector<int> lod1 = {0, 1, 2, 3, 10};
int numel1 = 100; int numel1 = 100;
paddle::framework::LoD expect_lod1; paddle::framework::LoD expect_lod1;
int* expect1 = CreateForSaveCombineOp(10, 10, lod1, "test_var1", place, int* expect1 = CreateForSaveCombineOp<int, int>(10, 10, lod1, "test_var1",
&scope, &expect_lod1); place, &scope, &expect_lod1);
std::vector<int> lod2 = {0, 2, 5, 10}; std::vector<int> lod2 = {0, 2, 5, 10};
int numel2 = 200; int numel2 = 200;
paddle::framework::LoD expect_lod2; paddle::framework::LoD expect_lod2;
int* expect2 = CreateForSaveCombineOp(10, 20, lod2, "test_var2", place, int* expect2 = CreateForSaveCombineOp<int, int>(10, 20, lod2, "test_var2",
&scope, &expect_lod2); place, &scope, &expect_lod2);
std::vector<int> lod3 = {0, 2, 3, 20}; std::vector<int> lod3 = {0, 2, 3, 20};
int numel3 = 4000; int numel3 = 4000;
paddle::framework::LoD expect_lod3; paddle::framework::LoD expect_lod3;
int* expect3 = CreateForSaveCombineOp(20, 200, lod3, "test_var3", place, int* expect3 = CreateForSaveCombineOp<int, int>(20, 200, lod3, "test_var3",
&scope, &expect_lod3); place, &scope, &expect_lod3);
std::vector<int> lod4 = {0, 1, 20}; std::vector<int> lod4 = {0, 1, 20};
int numel4 = 1000; int numel4 = 1000;
paddle::framework::LoD expect_lod4; paddle::framework::LoD expect_lod4;
int* expect4 = CreateForSaveCombineOp(20, 50, lod4, "test_var4", place, int* expect4 = CreateForSaveCombineOp<int, int>(20, 50, lod4, "test_var4",
&scope, &expect_lod4); place, &scope, &expect_lod4);
// Set attributes // Set attributes
std::string filename = "check_tensor.ls"; std::string filename = "check_tensor.ls";
...@@ -123,15 +128,92 @@ TEST(SaveLoadCombineOp, CPU) { ...@@ -123,15 +128,92 @@ TEST(SaveLoadCombineOp, CPU) {
load_combine_op->Run(scope, place); load_combine_op->Run(scope, place);
paddle::framework::LoD actual_lod1, actual_lod2, actual_lod3, actual_lod4; paddle::framework::LoD actual_lod1, actual_lod2, actual_lod3, actual_lod4;
int* actual1 = GetValuesAfterLoadCombineOp(target1, scope, &actual_lod1); int* actual1 = GetValuesAfterLoadCombineOp<int>(target1, scope, &actual_lod1);
int* actual2 = GetValuesAfterLoadCombineOp(target2, scope, &actual_lod2); int* actual2 = GetValuesAfterLoadCombineOp<int>(target2, scope, &actual_lod2);
int* actual3 = GetValuesAfterLoadCombineOp(target3, scope, &actual_lod3); int* actual3 = GetValuesAfterLoadCombineOp<int>(target3, scope, &actual_lod3);
int* actual4 = GetValuesAfterLoadCombineOp(target4, scope, &actual_lod4); int* actual4 = GetValuesAfterLoadCombineOp<int>(target4, scope, &actual_lod4);
CheckValues(expect1, actual1, expect_lod1, actual_lod1, numel1); CheckValues<int, int>(expect1, actual1, expect_lod1, actual_lod1, numel1);
CheckValues(expect2, actual2, expect_lod2, actual_lod2, numel2); CheckValues<int, int>(expect2, actual2, expect_lod2, actual_lod2, numel2);
CheckValues(expect3, actual3, expect_lod3, actual_lod3, numel3); CheckValues<int, int>(expect3, actual3, expect_lod3, actual_lod3, numel3);
CheckValues(expect4, actual4, expect_lod4, actual_lod4, numel4); CheckValues<int, int>(expect4, actual4, expect_lod4, actual_lod4, numel4);
}
// FP16 version of SaveLoadCombineOp Test
TEST(SaveLoadCombineFP16Op, CPU) {
paddle::framework::Scope scope;
paddle::platform::CPUPlace place;
std::vector<int> lod1 = {0, 1, 2, 3, 10};
int numel1 = 100;
paddle::framework::LoD expect_lod1;
float* expect1 = CreateForSaveCombineOp<float, paddle::platform::float16>(
10, 10, lod1, "test_var1", place, &scope, &expect_lod1);
std::vector<int> lod2 = {0, 2, 5, 10};
int numel2 = 200;
paddle::framework::LoD expect_lod2;
float* expect2 = CreateForSaveCombineOp<float, paddle::platform::float16>(
10, 20, lod2, "test_var2", place, &scope, &expect_lod2);
std::vector<int> lod3 = {0, 20};
int numel3 = 4000;
paddle::framework::LoD expect_lod3;
float* expect3 = CreateForSaveCombineOp<float, paddle::platform::float16>(
20, 200, lod3, "test_var3", place, &scope, &expect_lod3);
std::vector<int> lod4 = {0, 1, 20};
int numel4 = 1000;
paddle::framework::LoD expect_lod4;
float* expect4 = CreateForSaveCombineOp<float, paddle::platform::float16>(
20, 50, lod4, "test_var4", place, &scope, &expect_lod4);
// Set attributes
std::string filename = "check_tensor_fp16.ls";
paddle::framework::AttributeMap attrs;
attrs.insert({"file_path", std::string(filename)});
attrs.insert({"save_as_fp16", true});
// Run the save_combine_op
auto save_combine_op = paddle::framework::OpRegistry::CreateOp(
"save_combine",
{{"X", {"test_var1", "test_var2", "test_var3", "test_var4"}}}, {}, attrs);
save_combine_op->Run(scope, place);
// Set up output vars
auto target1 = GeneratePlaceholderBeforeLoad("out_var1", &scope);
auto target2 = GeneratePlaceholderBeforeLoad("out_var2", &scope);
auto target3 = GeneratePlaceholderBeforeLoad("out_var3", &scope);
auto target4 = GeneratePlaceholderBeforeLoad("out_var4", &scope);
// Run the load_combine_op
auto load_combine_op = paddle::framework::OpRegistry::CreateOp(
"load_combine", {},
{{"Out", {"out_var1", "out_var2", "out_var3", "out_var4"}}}, attrs);
load_combine_op->Run(scope, place);
paddle::framework::LoD actual_lod1, actual_lod2, actual_lod3, actual_lod4;
paddle::platform::float16* actual1 =
GetValuesAfterLoadCombineOp<paddle::platform::float16>(target1, scope,
&actual_lod1);
paddle::platform::float16* actual2 =
GetValuesAfterLoadCombineOp<paddle::platform::float16>(target2, scope,
&actual_lod2);
paddle::platform::float16* actual3 =
GetValuesAfterLoadCombineOp<paddle::platform::float16>(target3, scope,
&actual_lod3);
paddle::platform::float16* actual4 =
GetValuesAfterLoadCombineOp<paddle::platform::float16>(target4, scope,
&actual_lod4);
CheckValues<float, paddle::platform::float16>(expect1, actual1, expect_lod1,
actual_lod1, numel1);
CheckValues<float, paddle::platform::float16>(expect2, actual2, expect_lod2,
actual_lod2, numel2);
CheckValues<float, paddle::platform::float16>(expect3, actual3, expect_lod3,
actual_lod3, numel3);
CheckValues<float, paddle::platform::float16>(expect4, actual4, expect_lod4,
actual_lod4, numel4);
} }
// Test with original SaveLoadTest // Test with original SaveLoadTest
...@@ -141,7 +223,7 @@ TEST(SaveLoadTestWithCombineOp, CPU) { ...@@ -141,7 +223,7 @@ TEST(SaveLoadTestWithCombineOp, CPU) {
auto var = scope.Var("test_var"); auto var = scope.Var("test_var");
auto tensor = var->GetMutable<paddle::framework::LoDTensor>(); auto tensor = var->GetMutable<paddle::framework::LoDTensor>();
tensor->Resize({3, 10}); tensor->Resize({3, 4000});
paddle::framework::LoD expect_lod; paddle::framework::LoD expect_lod;
expect_lod.resize(1); expect_lod.resize(1);
expect_lod[0].push_back(0); expect_lod[0].push_back(0);
......
...@@ -70,7 +70,14 @@ TEST(SaveFP16Op, CPU) { ...@@ -70,7 +70,14 @@ TEST(SaveFP16Op, CPU) {
auto var = scope.Var("test_var"); auto var = scope.Var("test_var");
auto tensor = var->GetMutable<paddle::framework::LoDTensor>(); auto tensor = var->GetMutable<paddle::framework::LoDTensor>();
tensor->Resize({3, 10}); tensor->Resize({3, 10});
paddle::framework::LoD expect_lod;
expect_lod.resize(1);
expect_lod[0].push_back(0);
expect_lod[0].push_back(1);
expect_lod[0].push_back(2);
expect_lod[0].push_back(3);
tensor->set_lod(expect_lod);
float* expect = tensor->mutable_data<float>(place); float* expect = tensor->mutable_data<float>(place);
for (int64_t i = 0; i < tensor->numel(); ++i) { for (int64_t i = 0; i < tensor->numel(); ++i) {
expect[i] = static_cast<float>(paddle::platform::float16(i)); expect[i] = static_cast<float>(paddle::platform::float16(i));
...@@ -93,6 +100,13 @@ TEST(SaveFP16Op, CPU) { ...@@ -93,6 +100,13 @@ TEST(SaveFP16Op, CPU) {
for (int64_t i = 0; i < tensor->numel(); ++i) { for (int64_t i = 0; i < tensor->numel(); ++i) {
EXPECT_EQ(expect[i], static_cast<float>(actual[i])); EXPECT_EQ(expect[i], static_cast<float>(actual[i]));
} }
auto& actual_lod = target->lod();
EXPECT_EQ(expect_lod.size(), actual_lod.size());
for (size_t i = 0; i < expect_lod.size(); ++i) {
for (size_t j = 0; j < expect_lod[i].size(); ++j) {
EXPECT_EQ(expect_lod[i][j], actual_lod[i][j]);
}
}
} }
TEST(LoadFP16Op, CPU) { TEST(LoadFP16Op, CPU) {
......
...@@ -473,6 +473,7 @@ EOF ...@@ -473,6 +473,7 @@ EOF
} }
function main() { function main() {
set -e
local CMD=$1 local CMD=$1
init init
case $CMD in case $CMD in
......
...@@ -59,7 +59,7 @@ EOL ...@@ -59,7 +59,7 @@ EOL
if [ ! -d "${HOME}/.ccache" ]; then if [ ! -d "${HOME}/.ccache" ]; then
mkdir ${HOME}/.ccache mkdir ${HOME}/.ccache
fi fi
set -x set -ex
${DOCKER_CMD} run -it \ ${DOCKER_CMD} run -it \
--name $CONTAINER_ID \ --name $CONTAINER_ID \
${DOCKER_ENV} \ ${DOCKER_ENV} \
......
...@@ -160,6 +160,7 @@ class Variable(object): ...@@ -160,6 +160,7 @@ class Variable(object):
persistable=None, persistable=None,
error_clip=None, error_clip=None,
stop_gradient=False, stop_gradient=False,
is_data=False,
**kwargs): **kwargs):
self.block = block self.block = block
self.error_clip = error_clip self.error_clip = error_clip
...@@ -238,6 +239,7 @@ class Variable(object): ...@@ -238,6 +239,7 @@ class Variable(object):
self.block.vars[name] = self self.block.vars[name] = self
self.op = None self.op = None
self.stop_gradient = stop_gradient self.stop_gradient = stop_gradient
self.is_data = is_data
def __str__(self): def __str__(self):
return self.to_string(True) return self.to_string(True)
...@@ -978,7 +980,8 @@ class Block(object): ...@@ -978,7 +980,8 @@ class Block(object):
shape=var.shape, shape=var.shape,
dtype=var.dtype, dtype=var.dtype,
type=var.type, type=var.type,
persistable=True) persistable=True,
is_data=var.is_data)
else: else:
ret_var = self.create_var( ret_var = self.create_var(
name=var.name, name=var.name,
...@@ -986,7 +989,8 @@ class Block(object): ...@@ -986,7 +989,8 @@ class Block(object):
dtype=var.dtype, dtype=var.dtype,
type=var.type, type=var.type,
lod_level=var.lod_level, lod_level=var.lod_level,
persistable=True) persistable=True,
is_data=var.is_data)
return ret_var return ret_var
...@@ -1051,6 +1055,7 @@ class Program(object): ...@@ -1051,6 +1055,7 @@ class Program(object):
p.sync_with_cpp() p.sync_with_cpp()
p.copy_param_info_from(self) p.copy_param_info_from(self)
p.copy_data_info_from(self)
return p return p
def prune(self, targets): def prune(self, targets):
...@@ -1172,6 +1177,26 @@ class Program(object): ...@@ -1172,6 +1177,26 @@ class Program(object):
"program, with represent the same topology") "program, with represent the same topology")
self.global_block().copy_param_info_from(other.global_block()) self.global_block().copy_param_info_from(other.global_block())
def copy_data_info_from(self, other):
"""
Copy the information of data variables from other program.
Args:
other(Program): Other program
Returns:
None
"""
if not isinstance(other, Program):
raise TypeError("copy_param_info_from should be invoked with "
"Program")
if len(self.blocks) != len(other.blocks):
raise ValueError("copy_param_info_from should be invoked with two "
"program, with represent the same topology")
for var in other.global_block().vars.itervalues():
if var.is_data:
self.global_block().var(var.name).is_data = True
def list_vars(self): def list_vars(self):
for each_block in self.blocks: for each_block in self.blocks:
for each_var in each_block.vars.itervalues(): for each_var in each_block.vars.itervalues():
......
...@@ -78,8 +78,8 @@ def data(name, ...@@ -78,8 +78,8 @@ def data(name,
dtype=dtype, dtype=dtype,
type=type, type=type,
stop_gradient=stop_gradient, stop_gradient=stop_gradient,
lod_level=lod_level) lod_level=lod_level,
data_var.is_data = True is_data=True)
return data_var return data_var
......
...@@ -80,8 +80,11 @@ def inference_program(is_sparse): ...@@ -80,8 +80,11 @@ def inference_program(is_sparse):
def train_program(is_sparse): def train_program(is_sparse):
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') # The declaration of 'next_word' must be after the invoking of inference_program,
# or the data input order of train program would be [next_word, firstw, secondw,
# thirdw, forthw], which is not correct.
predict_word = inference_program(is_sparse) predict_word = inference_program(is_sparse)
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) cost = fluid.layers.cross_entropy(input=predict_word, label=next_word)
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
return avg_cost return avg_cost
...@@ -90,14 +93,17 @@ def train_program(is_sparse): ...@@ -90,14 +93,17 @@ def train_program(is_sparse):
def train(use_cuda, is_sparse, save_path): def train(use_cuda, is_sparse, save_path):
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)
test_reader = paddle.batch(
paddle.dataset.imikolov.test(word_dict, N), BATCH_SIZE)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
def event_handler(event): def event_handler(event):
print type(event) # print type(event)
if isinstance(event, fluid.EndEpochEvent): if isinstance(event, fluid.EndEpochEvent):
avg_cost = trainer.test(reader=paddle.dataset.imikolov.test( outs = trainer.test(reader=test_reader)
word_dict, N)) avg_cost = outs[0]
print("loss= ", avg_cost)
if avg_cost < 5.0: if avg_cost < 5.0:
trainer.save_params(save_path) trainer.save_params(save_path)
......
...@@ -75,11 +75,15 @@ class Trainer(object): ...@@ -75,11 +75,15 @@ class Trainer(object):
self.train_program = framework.Program() self.train_program = framework.Program()
with framework.program_guard(self.train_program, self.startup_program): with framework.program_guard(self.train_program, self.startup_program):
loss = program_func() program_func_outs = program_func()
self.test_outputs = program_func_outs if isinstance(
program_func_outs, list) else [program_func_outs]
self.test_program = self.train_program.clone()
if not isinstance(optimizer, opt_module.Optimizer): if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError( raise TypeError(
"The optimizer should be an instance of Optimizer") "The optimizer should be an instance of Optimizer")
# The fisrt element of program_func_outs is loss.
loss = self.test_outputs[0]
optimize_ops, params_grads = optimizer.minimize(loss) optimize_ops, params_grads = optimizer.minimize(loss)
self.place = Trainer._check_and_get_place(place) self.place = Trainer._check_and_get_place(place)
...@@ -168,8 +172,17 @@ class Trainer(object): ...@@ -168,8 +172,17 @@ class Trainer(object):
self._train_by_executor(num_epochs, event_handler, reader, feed_order) self._train_by_executor(num_epochs, event_handler, reader, feed_order)
def test(self, reader): def test(self, reader, feed_order=None):
pass """
Test the model on given test data
Args:
reader: The reader that yields test data.
feed_order: Feeding order of reader. None will following the defining
order in program
"""
return self._test_by_executor(reader, feed_order, self.test_outputs)
def save_params(self, param_path): def save_params(self, param_path):
# reference: save_persistables in io.py # reference: save_persistables in io.py
...@@ -225,22 +238,10 @@ class Trainer(object): ...@@ -225,22 +238,10 @@ class Trainer(object):
""" """
with self._prog_and_scope_guard(): with self._prog_and_scope_guard():
exe = executor.Executor(self.place) feed_var_list = build_feed_var_list(self.train_program, feed_order)
if feed_order is None:
feed_var_list = [
var
for var in self.train_program.global_block(
).vars.itervalues()
if hasattr(var, 'is_data') and var.is_data
]
else:
feed_var_list = [
self.train_program.global_block().var(var_name)
for var_name in feed_order
]
feeder = data_feeder.DataFeeder( feeder = data_feeder.DataFeeder(
feed_list=feed_var_list, place=self.place) feed_list=feed_var_list, place=self.place)
exe = executor.Executor(self.place)
for epoch_id in range(num_epochs): for epoch_id in range(num_epochs):
event_handler(BeginEpochEvent(epoch_id)) event_handler(BeginEpochEvent(epoch_id))
for step_id, data in enumerate(reader()): for step_id, data in enumerate(reader()):
...@@ -248,3 +249,48 @@ class Trainer(object): ...@@ -248,3 +249,48 @@ class Trainer(object):
exe.run(feed=feeder.feed(data), fetch_list=[]) exe.run(feed=feeder.feed(data), fetch_list=[])
event_handler(EndStepEvent(epoch_id, step_id)) event_handler(EndStepEvent(epoch_id, step_id))
event_handler(EndEpochEvent(epoch_id)) event_handler(EndEpochEvent(epoch_id))
def _test_by_executor(self, reader, feed_order, fetch_list):
with executor.scope_guard(self.scope):
feed_var_list = build_feed_var_list(self.test_program, feed_order)
feeder = data_feeder.DataFeeder(
feed_list=feed_var_list, place=self.place)
exe = executor.Executor(self.place)
accumulated = len(fetch_list) * [0]
count = 0
for data in reader():
outs = exe.run(program=self.test_program,
feed=feeder.feed(data),
fetch_list=fetch_list)
accumulated = [x[0] + x[1][0] for x in zip(accumulated, outs)]
count += 1
return [x / count for x in accumulated]
def build_feed_var_list(program, feed_order):
if not isinstance(program, framework.Program):
raise TypeError("The 'program' should be an object of Program")
if feed_order is None:
feed_var_list = [
var for var in program.global_block().vars.itervalues()
if var.is_data
]
elif isinstance(feed_order, list):
feed_var_list = [
program.global_block().var(var_name) for var_name in feed_order
]
else:
if not isinstance(feed_order, dict):
raise TypeError(
"The 'feed_order' should be either None, list or dict.")
if not sorted(feed_order.values()) == range(len(feed_order)):
raise ValueError(
"The values of 'feed_order' should be a permutation of [0, len(feed_order))"
)
sorted_pair_list = sorted(feed_order.items(), key=lambda item: item[1])
feed_var_list = [
program.global_block().var(pair[0]) for pair in sorted_pair_list
]
return feed_var_list
...@@ -18,7 +18,9 @@ import math ...@@ -18,7 +18,9 @@ import math
import distributed_splitter as splitter import distributed_splitter as splitter
from .. import core from .. import core
from ..framework import Program, default_main_program, Variable, Parameter from ..framework import Program, default_main_program, \
default_startup_program, \
Variable, Parameter, grad_var_name
LOOKUP_TABLE_TYPE = "lookup_table" LOOKUP_TABLE_TYPE = "lookup_table"
LOOKUP_TABLE_GRAD_TYPE = "lookup_table_grad" LOOKUP_TABLE_GRAD_TYPE = "lookup_table_grad"
...@@ -244,7 +246,7 @@ class DistributeTranspiler: ...@@ -244,7 +246,7 @@ class DistributeTranspiler:
] ]
grad_list = [ grad_list = [
grad for grad in grad_list grad for grad in grad_list
if grad.name != framework.grad_var_name(self.table_name) if grad.name != grad_var_name(self.table_name)
] ]
self.table_param_grad = [ self.table_param_grad = [
param_grad for param_grad in params_grads param_grad for param_grad in params_grads
...@@ -494,7 +496,7 @@ class DistributeTranspiler: ...@@ -494,7 +496,7 @@ class DistributeTranspiler:
were split to several blocks. were split to several blocks.
""" """
s_prog = Program() s_prog = Program()
orig_s_prog = framework.default_startup_program() orig_s_prog = default_startup_program()
params = self.param_grad_ep_mapping[endpoint]["params"] params = self.param_grad_ep_mapping[endpoint]["params"]
def _get_splited_name_and_shape(varname): def _get_splited_name_and_shape(varname):
...@@ -619,7 +621,7 @@ class DistributeTranspiler: ...@@ -619,7 +621,7 @@ class DistributeTranspiler:
# 2. add split_ids_op and send_vars_op to send gradient to pservers # 2. add split_ids_op and send_vars_op to send gradient to pservers
# there should only be one table_name # there should only be one table_name
all_ops = program.global_block().ops all_ops = program.global_block().ops
table_grad_name = framework.grad_var_name(self.table_name) table_grad_name = grad_var_name(self.table_name)
for op in all_ops: for op in all_ops:
if table_grad_name in op.output_arg_names: if table_grad_name in op.output_arg_names:
op_index = list(all_ops).index(op) op_index = list(all_ops).index(op)
...@@ -692,7 +694,7 @@ class DistributeTranspiler: ...@@ -692,7 +694,7 @@ class DistributeTranspiler:
persistable=True) persistable=True)
grad_var = _clone_var( grad_var = _clone_var(
pserver_program.global_block(), pserver_program.global_block(),
self.origin_program.global_block().vars[framework.grad_var_name( self.origin_program.global_block().vars[grad_var_name(
self.table_name)], self.table_name)],
persistable=False) persistable=False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册