提交 b8770e4c 编写于 作者: W weixing02

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into ini

......@@ -179,6 +179,7 @@ set(EXTERNAL_LIBS
if(WITH_GPU)
include(cuda)
include(tensorrt)
endif(WITH_GPU)
if(WITH_AMD_GPU)
......
......@@ -45,6 +45,13 @@ ENV PATH=${PATH}:${GOROOT}/bin:${GOPATH}/bin
# install glide
RUN curl -s -q https://glide.sh/get | sh
# Install TensorRT
# The unnecessary files has been removed to make the library small. It only contains include and lib now.
RUN wget -qO- http://paddlepaddledeps.bj.bcebos.com/TensorRT-4.0.0.3.Ubuntu-16.04.4.x86_64-gnu.cuda-8.0.cudnn7.0.tar.gz | \
tar -xz -C /usr/local && \
cp -rf /usr/local/TensorRT/include /usr && \
cp -rf /usr/local/TensorRT/lib /usr
# git credential to skip password typing
RUN git config --global credential.helper store
......@@ -57,7 +64,7 @@ RUN localedef -i en_US -f UTF-8 en_US.UTF-8
# specify sphinx version as 1.5.6 and remove -U option for [pip install -U
# sphinx-rtd-theme] since -U option will cause sphinx being updated to newest
# version(1.7.1 for now), which causes building documentation failed.
RUN pip install --upgrade pip && \
RUN pip install --upgrade pip==9.0.3 && \
pip install -U wheel && \
pip install -U docopt PyYAML sphinx==1.5.6 && \
pip install sphinx-rtd-theme==0.1.9 recommonmark
......
......@@ -27,7 +27,7 @@ RUN git config --global credential.helper store
# Fix locales to en_US.UTF-8
RUN localedef -i en_US -f UTF-8 en_US.UTF-8
RUN pip install --upgrade pip && \
RUN pip install --upgrade pip==9.0.3 && \
pip install -U 'protobuf==3.1.0' && \
pip install -U wheel sphinx && \
pip install pre-commit
......
......@@ -78,7 +78,7 @@ if(NOT CMAKE_CROSSCOMPILING)
/usr/lib/reference/
)
else()
# Diable the finding of reference cblas under host's system path
# Disable the finding of reference cblas under host's system path
set(REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/include)
set(REFERENCE_CBLAS_LIB_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/lib)
endif()
......
......@@ -80,6 +80,16 @@ if(WITH_GPU)
# Include cuda and cudnn
include_directories(${CUDNN_INCLUDE_DIR})
include_directories(${CUDA_TOOLKIT_INCLUDE})
if(TENSORRT_FOUND)
if(${CUDA_VERSION_MAJOR} VERSION_LESS 8)
message(FATAL_ERROR "TensorRT needs CUDA >= 8.0 to compile")
endif()
if(${CUDNN_MAJOR_VERSION} VERSION_LESS 7)
message(FATAL_ERROR "TensorRT needs CUDNN >= 7.0 to compile")
endif()
include_directories(${TENSORRT_INCLUDE_DIR})
endif()
elseif(WITH_AMD_GPU)
add_definitions(-DPADDLE_WITH_HIP)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D__HIP_PLATFORM_HCC__")
......
......@@ -33,7 +33,7 @@ ExternalProject_Add(
extern_grpc
DEPENDS protobuf zlib
GIT_REPOSITORY "https://github.com/grpc/grpc.git"
GIT_TAG "v1.11.x"
GIT_TAG "v1.10.x"
PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......
if(NOT WITH_GPU)
return()
endif()
set(TENSORRT_ROOT "/usr" CACHE PATH "TENSORRT ROOT")
find_path(TENSORRT_INCLUDE_DIR NvInfer.h
PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/include
$ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/include
NO_DEFAULT_PATH
)
find_library(TENSORRT_LIBRARY NAMES libnvinfer.so libnvinfer.a
PATHS ${TENSORRT_ROOT} ${TENSORRT_ROOT}/lib
$ENV{TENSORRT_ROOT} $ENV{TENSORRT_ROOT}/lib
NO_DEFAULT_PATH
DOC "Path to TensorRT library.")
if(TENSORRT_INCLUDE_DIR AND TENSORRT_LIBRARY)
set(TENSORRT_FOUND ON)
else()
set(TENSORRT_FOUND OFF)
endif()
if(TENSORRT_FOUND)
file(READ ${TENSORRT_INCLUDE_DIR}/NvInfer.h TENSORRT_VERSION_FILE_CONTENTS)
string(REGEX MATCH "define NV_TENSORRT_MAJOR +([0-9]+)" TENSORRT_MAJOR_VERSION
"${TENSORRT_VERSION_FILE_CONTENTS}")
string(REGEX REPLACE "define NV_TENSORRT_MAJOR +([0-9]+)" "\\1"
TENSORRT_MAJOR_VERSION "${TENSORRT_MAJOR_VERSION}")
message(STATUS "Current TensorRT header is ${TENSORRT_INCLUDE_DIR}/NvInfer.h. "
"Current TensorRT version is v${TENSORRT_MAJOR_VERSION}. ")
endif()
......@@ -3,7 +3,9 @@ add_custom_target(paddle_apis ALL
add_custom_target(paddle_docs ALL
DEPENDS paddle_v2_docs paddle_v2_docs_cn
paddle_fluid_docs paddle_fluid_docs_cn)
paddle_fluid_docs paddle_fluid_docs_cn
paddle_mobile_docs paddle_mobile_docs_cn)
add_subdirectory(v2)
add_subdirectory(fluid)
add_subdirectory(mobile)
......@@ -473,6 +473,12 @@ multiplex
.. autofunction:: paddle.fluid.layers.multiplex
:noindex:
label_smooth
------------
.. autofunction:: paddle.fluid.layers.label_smooth
:noindex:
ops
===
......
......@@ -84,7 +84,7 @@ Running an operator can be asynchronized. There is a thread pool to execute an `
## Synchronize GPU Kernels
The GPU is a non-blocking device. The different streams need be synchronized when switing streams. In current implementation, the synchronization based on the following algorithm:
The GPU is a non-blocking device. The different streams need be synchronized when switching streams. In current implementation, the synchronization based on the following algorithm:
1. `OpHandle` will record `DeviceContext` that it is used.
2. In `OpHandle::Run`, if the `DeviceContext` of current operator is different from `DeviceContext` of any input variable, just wait the generate operator of this input variable.
......
## Distributed training overview doc
Currently Paddle Fluid use parameter server architecture to support distributed training.
For synchronous and asynchronous training, the differences are mostly in the logic of parameter server. Now we have already support synchronous training.
### Synchronous training
The training process of synchronous training is:
![synchronous distributed training](./src/sync_distributed_training.png)
1. Pserver
1. set `barrier_condition_` to 0 and waits for trainers to send gradient.
1. Trainer
1. Trainer read minibatch of data, run forward-backward with local parameter copy and get the gradients for parameters.
1. Trainer use split op to split all the gradient into blocks. The split method is determined at compile time.
1. Trainer use send_op to send all the split gradients to corresponding parameter server.
1. After trainer send all the gradients, it will send a `BATCH_BARRIER_MESSAGE` to all pservers.
1. Trainer call GetVariable to pserver and wait for `barrier_condition_` on pserver to be 1.
1. Pserver
1. Pserver will count the number of `BATCH_BARRIER_MESSAGE`.
1. When the count of `BATCH_BARRIER_MESSAGE` is equal to the number of Trainer. Pserver thinks it received all gradient from all trainers.
1. Pserver will run the optimization block to optimize the parameters.
1. After optimization, pserver set `barrier_condition_` to 1.
1. Pserver wait for `FETCH_BARRIER_MESSAGE`.
1. Trainer.
1. The trainer uses GetVariable to get all the parameters from pserver.
1. Trainer sends a `FETCH_BARRIER_MESSAGE` to each pserver.
1. Pserver.
1. when the number of `FETCH_BARRIER_MESSAGE` reach the number of all trainers. Pserver think all the parameters have been got. it will go back to 1. to set `barrier_condition_` to 0.
### Asynchronous training
In the above process. There are two barriers for all trainers to synchronize with each other. In asynchronous training, these two barriers are not needed. The trainer can just send gradients to pserver and then get parameters back.
The training process of asynchronous training can be:
![asynchronous distributed training](./src/async_distributed_training.png)
1. Pserver:
1. Each parameter has a queue to receive its gradient from trainers.
1. Each parameter has a thread to read data from the queue and run optimize block, using the gradient to optimize the parameter.
1. Using an independent thread to handle RPC call `GetVariable` for trainers to get parameters back.(Maybe here we should use a thread pool to speed up fetching the parameters.)
1. Trainer:
1. Trainer read a batch of data. Run forward and backward with local parameter copy and get the gradients for parameters.
1. Trainer split all gradients to blocks and then send these gradient blocks to pservers(pserver will put them into the queue).
2. Trainer gets all parameters back from pserver.
### Note:
There are also some conditions that need to consider. For exmaple:
1. If trainer needs to wait for the pserver to apply it's gradient and then get back the parameters back.
1. If we need a lock between parameter update and parameter fetch.
1. If one parameter must be on one server, or it can also be split and send to multiple parameter servers.
The above architecture of asynchronous training can support different mode, we can have a detailed test in the future for these problems.
# Design Doc: Asynchronous Update With Distributed Training
## Background
For the typical synchronous distributed training, some significant steps are as follows:
1. A Trainer will compute the gradients and SEND them to the Parameter Server(PServer) nodes.
1. After the PServer node received gradients came from all the Trainers, It will aggregate the
gradient variables for the same parameter into one gradient variable and then apply the aggregated
gradient to the respective parameter, finally using an optimize algorithms(SGD, Monument...)
to update the parameters.
1. The Trainer would wait for the PServers finished the optimize stage, and GET the parameters from PServer,
so all the Trainers would get the same parameters.
In the synchronously distributed training, there should be a `Barrier` to synchronise the
parameters after the optimizing stage. The performance of a distributed training job would
depend on the slowest node if there were hundreds or thousands of training nodes in a
Job, the performance of synchronously distributed training might be very poor because of
the slow node. So this design doc would introduce an approach to implement
*asynchronously* distributed training in PaddlePaddle Fluid.
## Design
<img src="./src/async_update.png" width="600"/>
As the figure above, we describe a global view of asynchronously update process and use
the parameter `w1` as an example to introduce the steps:
1. For each gradient variables, they may distribute on different GPU card and aggregate
them while they are all calculated.
1. Split the gradient variable into multiple blocks according to the number of PServer
instances and then send them.
1. PServer would run an `Optimize Block` using a specified optimize algorithm to update
the specified parameter.
1. The trainer will fetch latest parameter from PServer before running forward Op which depends
on the specified parameter.
1. Broadcast the received variable into multiple GPU cards and continue to run the next
mini-batch.
### Trainer
- For the multiple devices distributed training, we need to aggregate the gradient
variables which placed on different devices firstly and then schedule a `SendVars` Operator to
send the gradient variables to the multiple PServer instances.
- Schedule `FetchVars` operator to fetch the latest parameter from PServer before running
the forward ops.
- There could be a large number of gradient variables to be sent, so we need to use another
thread pool(IO Threadpool) whose a number of the schedulable threads is larger than the
computing thread pool to avoid competitive the thread resources with computing.
### Parameter Server
<img src="./src/async_pserver.png" width="750"/>
- There should be multiple trainer instances want to optimize the same parameter at
the same time, to avoid the racing, we need one `BlockingQueue` for each gradient
variable to process them one by one.
- We need a `Map` structure to map a gradient variable name to the `OptimizeBlock` which
can optimize the respective parameter.
......@@ -4,6 +4,7 @@
.. toctree::
:maxdepth: 1
api_doc_std_cn.md
new_op_cn.md
new_op_kernel.md
use_eigen_cn.md
......
......@@ -4,6 +4,7 @@ Development
.. toctree::
:maxdepth: 1
api_doc_std_en.md
new_op_en.md
new_op_kernel.md
use_eigen_en.md
......
if(NOT DEFINED SPHINX_THEME)
set(SPHINX_THEME default)
endif()
if(NOT DEFINED SPHINX_THEME_DIR)
set(SPHINX_THEME_DIR)
endif()
# configured documentation tools and intermediate build results
set(BINARY_BUILD_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_build")
# Sphinx cache with pickled ReST documents
set(SPHINX_CACHE_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/_doctrees")
# HTML output director
set(SPHINX_HTML_DIR_EN "${CMAKE_CURRENT_BINARY_DIR}/en/html")
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.en.in"
"${BINARY_BUILD_DIR_EN}/conf.py"
@ONLY)
sphinx_add_target(paddle_mobile_docs
html
${BINARY_BUILD_DIR_EN}
${SPHINX_CACHE_DIR_EN}
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_EN})
add_dependencies(paddle_mobile_docs gen_proto_py paddle_python)
# configured documentation tools and intermediate build results
set(BINARY_BUILD_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_build")
# Sphinx cache with pickled ReST documents
set(SPHINX_CACHE_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_doctrees")
# HTML output director
set(SPHINX_HTML_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/html")
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/../templates/conf.py.cn.in"
"${BINARY_BUILD_DIR_CN}/conf.py"
@ONLY)
sphinx_add_target(paddle_mobile_docs_cn
html
${BINARY_BUILD_DIR_CN}
${SPHINX_CACHE_DIR_CN}
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_CN})
add_dependencies(paddle_mobile_docs_cn gen_proto_py paddle_python)
移动端
=====
.. toctree::
:maxdepth: 1
cross_compiling_for_android_cn.md
cross_compiling_for_ios_cn.md
cross_compiling_for_raspberry_cn.md
\ No newline at end of file
Mobile
======
.. toctree::
:maxdepth: 1
cross_compiling_for_android_en.md
cross_compiling_for_ios_en.md
cross_compiling_for_raspberry_en.md
cc_library(var_handle SRCS var_handle.cc DEPS place)
cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context)
cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor)
cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory)
cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
......@@ -20,3 +20,11 @@ cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS
cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto)
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory)
cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context gather_op_handle)
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
namespace paddle {
namespace framework {
namespace details {
Tensor *GetTensorFromVar(Variable *in_var) {
if (in_var->IsType<LoDTensor>()) {
return in_var->GetMutable<LoDTensor>();
} else if (in_var->IsType<SelectedRows>()) {
return in_var->GetMutable<SelectedRows>()->mutable_value();
} else {
PADDLE_THROW("Var should be LoDTensor or SelectedRows");
}
return nullptr;
}
BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
void BroadcastOpHandle::RunImpl() {
// the input may have dummy var.
std::vector<VarHandle *> in_var_handle;
for (auto *in : inputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(in);
if (out_handle) {
in_var_handle.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(in_var_handle.size(), 1,
"The number of input should be one.");
// the output may have dummy var.
std::vector<VarHandle *> out_var_handles;
for (auto *out : outputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(out);
if (out_handle) {
out_var_handles.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(
out_var_handles.size(), places_.size(),
"The number of output should equal to the number of places.");
// Wait input done, this Wait is asynchronous operation
auto &in_place = in_var_handle[0]->place_;
if (in_var_handle[0]->generated_op_) {
for (auto *out : out_var_handles) {
auto &out_p = out->place_;
in_var_handle[0]->generated_op_->Wait(dev_ctxes_[out_p]);
}
}
//
auto in_scope_idx = in_var_handle[0]->scope_idx_;
auto in_var =
local_scopes_.at(in_scope_idx)->FindVar(in_var_handle[0]->name_);
Tensor *in_tensor = GetTensorFromVar(in_var);
for (auto *out : out_var_handles) {
auto &out_p = out->place_;
auto out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_EQ(out_p.which(), in_place.which(),
"Places must be all on CPU or all on CUDA.");
if (in_var->IsType<framework::SelectedRows>()) {
auto &in_sr = in_var->Get<framework::SelectedRows>();
auto out_sr = out_var->GetMutable<framework::SelectedRows>();
if (&in_sr == out_sr) continue;
out_sr->set_height(in_sr.height());
out_sr->set_rows(in_sr.rows());
out_sr->mutable_value()->Resize(in_sr.value().dims());
out_sr->mutable_value()->mutable_data(out_p, in_sr.value().type());
} else if (in_var->IsType<framework::LoDTensor>()) {
auto in_lod = in_var->Get<framework::LoDTensor>();
auto out_lod = out_var->GetMutable<framework::LoDTensor>();
if (&in_lod == out_lod) continue;
out_lod->set_lod(in_lod.lod());
out_lod->Resize(in_lod.dims());
out_lod->mutable_data(out_p, in_lod.type());
} else {
PADDLE_THROW("Var should be LoDTensor or SelectedRows.");
}
Tensor *out_tensor = GetTensorFromVar(out_var);
paddle::framework::TensorCopy(*in_tensor, out_p, *(dev_ctxes_[in_place]),
out_tensor);
}
}
std::string BroadcastOpHandle::Name() const { return "broadcast"; }
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
struct BroadcastOpHandle : public OpHandleBase {
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
std::string Name() const override;
bool IsMultiDeviceTransfer() override { return false; };
protected:
void RunImpl() override;
};
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "gtest/gtest.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};
struct TestBroadcastOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
}
void InitCtxOnGpu(bool use_gpu) {
if (use_gpu) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::CUDAPlace(i);
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
int count = 8;
for (int i = 0; i < count; ++i) {
auto p = p::CPUPlace();
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
}
}
void InitBroadcastOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
}
local_scopes_[input_scope_idx]->Var("input");
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
vars_.emplace_back(new VarHandle());
VarHandle* in_var_handle = static_cast<VarHandle*>(vars_.back().get());
in_var_handle->place_ = gpu_list_[input_scope_idx];
in_var_handle->name_ = "input";
in_var_handle->version_ = 1;
in_var_handle->scope_idx_ = input_scope_idx;
in_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(in_var_handle);
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(dummy_var_handle);
for (size_t j = 0; j < gpu_list_.size(); ++j) {
op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get();
vars_.emplace_back(new VarHandle());
VarHandle* out_var_handle = static_cast<VarHandle*>(vars_.back().get());
out_var_handle->place_ = gpu_list_[j];
out_var_handle->name_ = "out";
out_var_handle->version_ = 2;
out_var_handle->scope_idx_ = j;
op_handle_->AddOutput(out_var_handle);
}
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* out_dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
out_dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddOutput(out_dummy_var_handle);
}
void TestBroadcastLodTensor(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_lod_tensor = in_var->GetMutable<f::LoDTensor>();
in_lod_tensor->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
f::LoD lod{{0, 10, 20}};
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor);
in_lod_tensor->set_lod(lod);
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scopes_[j]->Var("out");
auto out_tensor = out_var->Get<f::LoDTensor>();
PADDLE_ENFORCE_EQ(out_tensor.lod(), lod, "lod is not equal.");
f::Tensor result_tensor;
f::TensorCopy(out_tensor, cpu_place, *(ctxs_[j]), &result_tensor);
float* ct = result_tensor.mutable_data<float>(cpu_place);
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}
}
void TestBroadcastSelectedRows(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
int height = static_cast<int>(kDims[0]) * 2;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
in_selected_rows->set_height(height);
in_selected_rows->set_rows(rows);
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scopes_[j]->Var("out");
auto& out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();
PADDLE_ENFORCE_EQ(out_select_rows.height(), height,
"height is not equal.");
for (size_t k = 0; k < out_select_rows.rows().size(); ++k) {
PADDLE_ENFORCE_EQ(out_select_rows.rows()[k], rows[k]);
}
f::Tensor result_tensor;
f::TensorCopy(rt, cpu_place, *(ctxs_[j]), &result_tensor);
float* ct = result_tensor.data<float>();
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}
}
};
TEST(BroadcastTester, TestCPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}
TEST(BroadcastTester, TestCPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}
#ifdef PADDLE_WITH_CUDA
TEST(BroadcastTester, TestGPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}
TEST(BroadcastTester, TestGPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -35,7 +35,9 @@ void ComputationOpHandle::RunImpl() {
}
}
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
this->RunAndRecordEvent([this] {
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
});
}
std::string ComputationOpHandle::Name() const { return op_->Type(); }
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/gather_op_handle.h"
namespace paddle {
namespace framework {
namespace details {
GatherOpHandle::GatherOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
void GatherOpHandle::RunImpl() {
// the input may have dummy var.
std::vector<VarHandle *> in_var_handles;
for (auto *in : inputs_) {
auto *in_handle = dynamic_cast<VarHandle *>(in);
if (in_handle) {
in_var_handles.push_back(in_handle);
}
}
PADDLE_ENFORCE_EQ(
in_var_handles.size(), places_.size(),
"The number of output should equal to the number of places.");
// the output may have dummy var.
std::vector<VarHandle *> out_var_handles;
for (auto *out : outputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(out);
if (out_handle) {
out_var_handles.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1,
"The number of output should be one.");
auto in_0_handle = static_cast<VarHandle *>(in_var_handles[0]);
auto pre_in_var =
local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_);
auto pre_place = in_0_handle->place_;
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
"Currently, gather_op only can gather SelectedRows.");
PADDLE_ENFORCE_EQ(out_var_handles[0]->place_.which(), pre_place.which(),
"The place of input and output should be the same.");
// Wait input done, this Wait is asynchronous operation
for (auto *in : in_var_handles) {
if (in->generated_op_) {
in->generated_op_->Wait(dev_ctxes_[in->place_]);
}
}
std::vector<int64_t> out_rows;
std::vector<Tensor> in_tensors;
std::vector<platform::Place> in_places;
auto &pre_in = pre_in_var->Get<framework::SelectedRows>();
// gather the inputs
for (auto *in : in_var_handles) {
auto in_handle = static_cast<VarHandle *>(in);
auto in_p = in_handle->place_;
in_places.push_back(in_p);
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA.");
auto in_var =
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>();
PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(),
"The type of input is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(),
"The height of inputs is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(), ,
"The dims of inputs is not consistent.");
auto in_sr_rows = in_sr.rows();
out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end());
in_tensors.emplace_back(in_sr.value());
}
// write the output
auto &out_place = out_var_handles[0]->place_;
auto out_scope_idx = out_var_handles[0]->scope_idx_;
auto out_var =
local_scopes_[out_scope_idx]->FindVar(out_var_handles[0]->name_);
auto out = out_var->GetMutable<framework::SelectedRows>();
out->set_height(pre_in.height());
out->set_rows(out_rows);
size_t rows = out_rows.size();
DDim out_dim = pre_in.GetCompleteDims();
out_dim[0] = static_cast<int64_t>(rows);
out->mutable_value()->Resize(out_dim);
out->mutable_value()->mutable_data(out_place, pre_in.value().type());
Tensor *out_tensor = out->mutable_value();
// copy
int s = 0, e = 0;
for (size_t j = 0; j < in_tensors.size(); ++j) {
e += in_tensors[j].dims()[0];
auto sub_out = out_tensor->Slice(s, e);
paddle::framework::TensorCopy(in_tensors[j], out_place,
*(dev_ctxes_[in_places[j]]), &sub_out);
s = e;
}
}
std::string GatherOpHandle::Name() const { return "gather"; }
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
struct GatherOpHandle : public OpHandleBase {
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
GatherOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
std::string Name() const override;
bool IsMultiDeviceTransfer() override { return false; };
protected:
void RunImpl() override;
};
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/gather_op_handle.h"
#include "gtest/gtest.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};
struct TestGatherOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
}
void InitCtxOnGpu(bool use_gpu) {
if (use_gpu) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::CUDAPlace(i);
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
int count = 8;
for (int i = 0; i < count; ++i) {
auto p = p::CPUPlace();
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
}
}
void InitGatherOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
}
local_scopes_[input_scope_idx]->Var("input");
op_handle_.reset(new GatherOpHandle(local_scopes_, gpu_list_));
// add input
for (size_t j = 0; j < gpu_list_.size(); ++j) {
op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get();
vars_.emplace_back(new VarHandle());
VarHandle* in_var_handle = static_cast<VarHandle*>(vars_.back().get());
in_var_handle->place_ = gpu_list_[j];
in_var_handle->name_ = "input";
in_var_handle->version_ = 1;
in_var_handle->scope_idx_ = j;
in_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(in_var_handle);
}
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* in_dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
in_dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(in_dummy_var_handle);
// add output
vars_.emplace_back(new VarHandle());
VarHandle* out_var_handle = static_cast<VarHandle*>(vars_.back().get());
out_var_handle->place_ = gpu_list_[input_scope_idx];
out_var_handle->name_ = "out";
out_var_handle->version_ = 2;
out_var_handle->scope_idx_ = input_scope_idx;
op_handle_->AddOutput(out_var_handle);
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
op_handle_->AddOutput(dummy_var_handle);
}
void TestGatherSelectedRows(size_t output_scope_idx) {
int height = kDims[0] * 2;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
std::vector<float> send_vector(f::product(kDims));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size();
++input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
in_selected_rows->set_height(height);
in_selected_rows->set_rows(rows);
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);
value->Resize(kDims);
}
auto out_var = local_scopes_[output_scope_idx]->Var("out");
auto out_selected_rows = out_var->GetMutable<f::SelectedRows>();
auto in_var = local_scopes_[output_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
out_selected_rows->mutable_value()->ShareDataWith(
in_selected_rows->value());
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
auto& out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();
PADDLE_ENFORCE_EQ(out_select_rows.height(), height, "height is not equal.");
for (size_t k = 0; k < out_select_rows.rows().size(); ++k) {
PADDLE_ENFORCE_EQ(out_select_rows.rows()[k], rows[k % rows.size()]);
}
f::Tensor result_tensor;
f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor);
float* ct = result_tensor.data<float>();
for (int64_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], send_vector[j % send_vector.size()], 1e-5);
}
}
};
TEST(GatherTester, TestCPUGatherTestSelectedRows) {
TestGatherOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitGatherOp(input_scope_idx);
test_op.TestGatherSelectedRows(input_scope_idx);
}
#ifdef PADDLE_WITH_CUDA
TEST(GatherTester, TestGPUGatherTestSelectedRows) {
TestGatherOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitGatherOp(input_scope_idx);
test_op.TestGatherSelectedRows(input_scope_idx);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -55,21 +55,21 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
}
}
void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, OpDesc *op,
void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result,
const OpDesc &op,
const platform::Place &p,
const size_t &i) const {
auto *op_handle = result->ops_.back().get();
op_handle->dev_ctxes_[p] = const_cast<platform::DeviceContext *>(
platform::DeviceContextPool::Instance().Get(p));
op_handle->dev_ctxes_[p] = platform::DeviceContextPool::Instance().Get(p);
auto var_names = op->InputArgumentNames();
auto var_names = op.InputArgumentNames();
for (auto &each_var_name : var_names) {
VarHandle *var = CreateOrGetLatestVarHandle(result, each_var_name, p, i);
op_handle->AddInput(var);
}
var_names = op->OutputArgumentNames();
var_names = op.OutputArgumentNames();
for (auto &each_var_name : var_names) {
CreateOpOutput(result, op_handle, each_var_name, p, i);
......@@ -107,7 +107,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
result.ops_.emplace_back(new SendOpHandle(*op, s, p));
// Create inputs for output on original place and no ssa output
// is created for send op.
CreateOpHandleIOs(&result, op, p, 0);
CreateOpHandleIOs(&result, *op, p, 0);
continue;
}
......@@ -117,7 +117,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
result.ops_.emplace_back(new ComputationOpHandle(*op, s, p));
auto *op_handle = result.ops_.back().get();
CreateOpHandleIOs(&result, op, p, i);
CreateOpHandleIOs(&result, *op, p, i);
auto var_names = op->OutputArgumentNames();
......
......@@ -45,8 +45,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
private:
void CreateOpHandleIOs(SSAGraph *result, OpDesc *op, const platform::Place &p,
const size_t &i) const;
void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op,
const platform::Place &p, const size_t &i) const;
private:
std::string loss_var_name_;
......
......@@ -14,6 +14,8 @@
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
#include <algorithm>
namespace paddle {
namespace framework {
namespace details {
......@@ -27,6 +29,32 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle(
}
}
struct ReduceLoDTensor {
const std::vector<LoDTensor> &src_tensors_;
LoDTensor &dst_tensor_;
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
: src_tensors_(src), dst_tensor_(*dst) {}
template <typename T>
void operator()() const {
PADDLE_ENFORCE(!src_tensors_.empty());
auto &t0 = src_tensors_[0];
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = src_tensors_[i];
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
PADDLE_ENFORCE_EQ(t.type(), t0.type());
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
[](T a, T b) -> T { return a + b; });
}
}
};
void NCCLAllReduceOpHandle::RunImpl() {
if (inputs_.size() == 1) {
return; // No need to all reduce when GPU count = 1;
......@@ -41,37 +69,66 @@ void NCCLAllReduceOpHandle::RunImpl() {
int dtype = -1;
size_t numel = 0;
std::vector<std::function<void()>> all_reduce_calls;
std::vector<LoDTensor> lod_tensors;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &p = places_[i];
auto *s = local_scopes_[i];
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &lod_tensor = s->FindVar(var_name)->Get<LoDTensor>();
void *buffer = const_cast<void *>(lod_tensor.data<void>());
lod_tensors.emplace_back(lod_tensor);
}
if (dtype == -1) {
dtype = platform::ToNCCLDataType(lod_tensor.type());
}
if (platform::is_gpu_place(lod_tensors[0].place())) {
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &p = places_[i];
auto &lod_tensor = lod_tensors[i];
void *buffer = const_cast<void *>(lod_tensor.data<void>());
if (numel == 0) {
numel = static_cast<size_t>(lod_tensor.numel());
}
if (dtype == -1) {
dtype = platform::ToNCCLDataType(lod_tensor.type());
}
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
comm, stream));
if (numel == 0) {
numel = static_cast<size_t>(lod_tensor.numel());
}
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype),
ncclSum, comm, stream));
});
}
this->RunAndRecordEvent([&] {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
});
}
} else { // Special handle CPU only Operator's gradient. Like CRF
auto &trg =
*this->local_scopes_[0]->Var()->GetMutable<framework::LoDTensor>();
// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(ToDataType(lod_tensors[0].type()), func);
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &scope = local_scopes_[i];
auto &p = places_[i];
auto *var = scope->FindVar(var_name);
auto *dev_ctx = dev_ctxes_[p];
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>();
auto &tensor_cpu = trg;
TensorCopy(tensor_cpu, p, *dev_ctx, &tensor_gpu);
});
}
}
}
}
......
......@@ -54,17 +54,6 @@ void OpHandleBase::Run(bool use_event) {
#endif
RunImpl();
#ifdef PADDLE_WITH_CUDA
if (use_event) {
for (auto &p : dev_ctxes_) {
int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
auto stream =
static_cast<platform::CUDADeviceContext *>(p.second)->stream();
PADDLE_ENFORCE(cudaEventRecord(events_.at(dev_id), stream));
}
}
#endif
}
void OpHandleBase::Wait(platform::DeviceContext *waited_dev) {
......@@ -97,6 +86,43 @@ void OpHandleBase::AddOutput(VarHandleBase *out) {
out->generated_op_ = this;
}
void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA
if (!events_.empty()) { // Use event
std::function<void()> method = callback;
for (auto &p : dev_ctxes_) {
method = [method, p, this]() {
static_cast<platform::CUDADeviceContext *>(p.second)->RecordEvent(
events_.at(boost::get<platform::CUDAPlace>(p.first).device),
method);
};
}
method();
} else {
#endif
callback();
#ifdef PADDLE_WITH_CUDA
}
#endif
}
void OpHandleBase::RunAndRecordEvent(platform::Place p,
const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_cpu_place(p) || events_.empty()) {
callback();
} else {
auto *ctx = dev_ctxes_.at(p);
auto *cuda_ctx = static_cast<platform::CUDADeviceContext *>(ctx);
cuda_ctx->RecordEvent(events_.at(boost::get<platform::CUDAPlace>(p).device),
callback);
}
#else
callback();
#endif
}
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -62,6 +62,11 @@ class OpHandleBase {
virtual bool IsMultiDeviceTransfer() { return false; }
protected:
void RunAndRecordEvent(const std::function<void()> &callback);
void RunAndRecordEvent(platform::Place p,
const std::function<void()> &callback);
virtual void RunImpl() = 0;
};
......
......@@ -14,6 +14,8 @@
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
#include <string>
namespace paddle {
namespace framework {
namespace details {
......@@ -37,11 +39,13 @@ void ScaleLossGradOpHandle::RunImpl() {
*tmp = coeff_;
} else {
#ifdef PADDLE_WITH_CUDA
auto stream =
static_cast<platform::CUDADeviceContext *>(this->dev_ctxes_[place_])
->stream();
memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp,
platform::CPUPlace(), &coeff_, sizeof(float), stream);
this->RunAndRecordEvent([&] {
auto stream =
static_cast<platform::CUDADeviceContext *>(this->dev_ctxes_[place_])
->stream();
memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp,
platform::CPUPlace(), &coeff_, sizeof(float), stream);
});
#endif
}
}
......
......@@ -34,7 +34,7 @@ void SendOpHandle::RunImpl() {
}
in->generated_op_->Wait(dev_ctxes_[p]);
}
op_->Run(*local_scope_, place_);
this->RunAndRecordEvent([&] { op_->Run(*local_scope_, place_); });
}
std::string SendOpHandle::Name() const { return "send"; }
......
......@@ -33,13 +33,6 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
running_ops_(0),
allow_op_delay_(allow_op_delay) {}
void ThreadedSSAGraphExecutor::RunDelayedOps(
const std::unordered_set<OpHandleBase *> &delayed_ops) {
for (auto op : delayed_ops) {
op->Run(use_event_);
}
}
FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::unordered_map<OpHandleBase *, size_t> pending_ops;
......@@ -51,8 +44,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// together since we currently cannot overlap computation and memcpy streams.
// Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops;
std::unordered_set<OpHandleBase *> blocked_by_delayed_ops;
std::unordered_set<VarHandleBase *> delayed_vars;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
......@@ -122,24 +113,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
InsertPendingOp(*op);
}
auto run_all_ready_ops = [&] {
for (auto *op : ready_ops) {
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
delayed_vars.insert(op->outputs_.begin(), op->outputs_.end());
ready_vars.Extend(op->outputs_);
continue;
}
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) {
running_ops_++;
RunOp(&ready_vars, op);
}
ready_ops.clear();
set.clear();
};
// Step 3. Execution
while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
while (!pending_vars.empty()) {
// 1. Run All Ready ops
run_all_ready_ops();
// Keep loop until all vars are ready.
//
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
// ready_ops have been performed.
if (ready_ops.empty() && allow_op_delay_) {
run_all_ops(delayed_ops);
} else {
run_all_ops(ready_ops);
}
// 2. Find ready variable
bool timeout;
......@@ -160,29 +153,16 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto &deps = pending_ops[op];
--deps;
if (deps == 0) {
if (delayed_vars.find(ready_var) != delayed_vars.end()) {
blocked_by_delayed_ops.insert(op);
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
} else {
ready_ops.insert(op);
}
}
}
}
// When there are no other ops to schedule, schedule buffered delayed
// ops and unblock other ops.
if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) {
RunDelayedOps(delayed_ops);
delayed_ops.clear();
for (auto *op : blocked_by_delayed_ops) {
ready_ops.insert(op);
}
blocked_by_delayed_ops.clear();
}
// Keep loop until all vars are ready.
}
PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
// Wait FetchOps.
if (!fetch_ops.empty()) {
......@@ -196,10 +176,12 @@ void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] {
try {
VLOG(10) << op->Name() << " : " << op->DebugString();
VLOG(10) << op << " " << op->Name() << " : " << op->DebugString();
op->Run(use_event_);
VLOG(10) << op << " " << op->Name() << " Done ";
running_ops_--;
ready_var_q->Extend(op->outputs_);
VLOG(10) << op << " " << op->Name() << "Signal posted";
} catch (platform::EnforceNotMet ex) {
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) {
......
......@@ -88,8 +88,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q,
details::OpHandleBase *op);
void RunDelayedOps(const std::unordered_set<OpHandleBase *> &delayed_ops);
private:
std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_;
......
......@@ -50,6 +50,7 @@ struct VarHandle : public VarHandleBase {
// version field currently is not used, however, just store the version to
// debug easily.
size_t version_;
size_t scope_idx_;
std::string name_;
platform::Place place_;
};
......
......@@ -83,8 +83,8 @@ static void CheckTensorNANOrInf(const std::string& name,
if (tensor.memory_size() == 0) {
return;
}
if (tensor.type().hash_code() != typeid(float).hash_code() &&
tensor.type().hash_code() != typeid(double).hash_code()) {
if (tensor.type().hash_code() != typeid(float).hash_code() && // NOLINT
tensor.type().hash_code() != typeid(double).hash_code()) { // NOLINT
return;
}
PADDLE_ENFORCE(!framework::TensorContainsInf(tensor),
......@@ -145,12 +145,13 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
// Return true if the block has feed operators and holder of matching info.
static bool has_feed_operators(
const BlockDesc& block,
std::map<std::string, const LoDTensor*>& feed_targets,
const std::map<std::string, const LoDTensor*>& feed_targets,
const std::string& feed_holder_name) {
size_t feed_count = 0;
for (auto* op : block.AllOps()) {
if (op->Type() == kFeedOpType) {
feed_count++;
// The input variable's name of feed_op should be feed_holder_name.
PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name,
"Input to feed op should be '%s'", feed_holder_name);
std::string feed_target_name = op->Output("Out")[0];
......@@ -166,13 +167,15 @@ static bool has_feed_operators(
feed_count, feed_targets.size(),
"The number of feed operators should match 'feed_targets'");
// When feed operator are present, so should be feed_holder
auto var = block.FindVar(feed_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
feed_holder_name);
PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH,
"'%s' variable should be 'FEED_MINIBATCH' type",
feed_holder_name);
if (!feed_holder_name.empty()) {
// When feed operator are present, so should be feed_holder.
auto var = block.FindVar(feed_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
feed_holder_name);
PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH,
"'%s' variable should be 'FEED_MINIBATCH' type",
feed_holder_name);
}
}
return feed_count > 0;
......@@ -185,12 +188,14 @@ static bool has_feed_operators(
// and fetch_holder_name. Raise exception when any mismatch is found.
// Return true if the block has fetch operators and holder of matching info.
static bool has_fetch_operators(
const BlockDesc& block, std::map<std::string, LoDTensor*>& fetch_targets,
const BlockDesc& block,
const std::map<std::string, LoDTensor*>& fetch_targets,
const std::string& fetch_holder_name) {
size_t fetch_count = 0;
for (auto* op : block.AllOps()) {
if (op->Type() == kFetchOpType) {
fetch_count++;
// The output variable's name of fetch_op should be fetch_holder_name.
PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name,
"Output of fetch op should be '%s'", fetch_holder_name);
std::string fetch_target_name = op->Input("X")[0];
......@@ -206,13 +211,15 @@ static bool has_fetch_operators(
fetch_count, fetch_targets.size(),
"The number of fetch operators should match 'fetch_targets'");
// When fetch operator are present, so should be fetch_holder
auto var = block.FindVar(fetch_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
fetch_holder_name);
PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST,
"'%s' variable should be 'FETCH_LIST' type",
fetch_holder_name);
if (!fetch_holder_name.empty()) {
// When fetch operator are present, so should be fetch_holder.
auto var = block.FindVar(fetch_holder_name);
PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
fetch_holder_name);
PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST,
"'%s' variable should be 'FETCH_LIST' type",
fetch_holder_name);
}
}
return fetch_count > 0;
......@@ -259,16 +266,6 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
}
}
// map the data of feed_targets to feed_holder
for (auto* op : global_block->AllOps()) {
if (op->Type() == kFeedOpType) {
std::string feed_target_name = op->Output("Out")[0];
int idx = boost::get<int>(op->GetAttr("col"));
SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name,
idx);
}
}
if (!has_fetch_ops) {
// create fetch_holder variable
auto* fetch_holder = global_block->Var(fetch_holder_name);
......@@ -292,17 +289,9 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
}
}
Run(*copy_program, scope, 0, create_vars, create_vars);
// obtain the data of fetch_targets from fetch_holder
for (auto* op : global_block->AllOps()) {
if (op->Type() == kFetchOpType) {
std::string fetch_target_name = op->Input("X")[0];
int idx = boost::get<int>(op->GetAttr("col"));
*fetch_targets[fetch_target_name] =
GetFetchVariable(*scope, fetch_holder_name, idx);
}
}
auto ctx = Prepare(*copy_program, 0);
RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, create_vars,
feed_holder_name, fetch_holder_name);
}
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
......@@ -370,5 +359,42 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
}
}
void Executor::RunPreparedContext(
ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>& feed_targets,
std::map<std::string, LoDTensor*>& fetch_targets, bool create_vars,
const std::string& feed_holder_name, const std::string& fetch_holder_name) {
auto& global_block = ctx->prog_.Block(ctx->block_id_);
PADDLE_ENFORCE(
has_feed_operators(global_block, feed_targets, feed_holder_name),
"Program in ExecutorPrepareContext should has feed_ops.");
PADDLE_ENFORCE(
has_fetch_operators(global_block, fetch_targets, fetch_holder_name),
"Program in the prepared context should has fetch_ops.");
// map the data of feed_targets to feed_holder
for (auto* op : global_block.AllOps()) {
if (op->Type() == kFeedOpType) {
std::string feed_target_name = op->Output("Out")[0];
int idx = boost::get<int>(op->GetAttr("col"));
SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name,
idx);
}
}
RunPreparedContext(ctx, scope, create_vars, create_vars);
// obtain the data of fetch_targets from fetch_holder
for (auto* op : global_block.AllOps()) {
if (op->Type() == kFetchOpType) {
std::string fetch_target_name = op->Input("X")[0];
int idx = boost::get<int>(op->GetAttr("col"));
*fetch_targets[fetch_target_name] =
GetFetchVariable(*scope, fetch_holder_name, idx);
}
}
}
} // namespace framework
} // namespace paddle
......@@ -14,6 +14,9 @@ limitations under the License. */
#pragma once
#include <map>
#include <string>
#include <vector>
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
......@@ -70,6 +73,13 @@ class Executor {
bool create_local_scope = true,
bool create_vars = true);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>& feed_targets,
std::map<std::string, LoDTensor*>& fetch_targets,
bool create_vars = true,
const std::string& feed_holder_name = "feed",
const std::string& fetch_holder_name = "fetch");
private:
const platform::Place place_;
};
......
......@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
void ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
// Create local scopes.
for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope();
......@@ -195,14 +191,28 @@ void ParallelExecutor::Run(
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
local_scope = nullptr;
}
}
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
void ParallelExecutor::FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());
for (size_t i = 0; i < tensors.size(); ++i) {
auto &map = tensors[i];
auto *scope = member_->local_scopes_[i];
for (auto &pair : map) {
auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
trg->ShareDataWith(pair.second);
trg->set_lod(pair.second.lod());
}
}
}
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) {
for (auto pair : tensors) {
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
PADDLE_ENFORCE_EQ(
member_->places_.size(), lod_tensors.size(),
"The number of samples of current batch is less than the count of "
......@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces(
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
auto t =
member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
t->ShareDataWith(lod_tensors[j]);
t->set_lod(lod_tensors[j].lod());
}
......
......@@ -44,16 +44,22 @@ class ParallelExecutor {
std::vector<Scope*>& GetLocalScopes();
/**
* Feed tensors to local scopes. The size of tensors should be equal to the
* size of local scopes.
*/
void FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
void FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor>& tensors);
void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
const std::string& fetched_var_name);
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_;
};
......
......@@ -66,7 +66,7 @@ TEST(ProgramDesc, copy_ctor) {
for (size_t i = 0; i < global_block->OpSize(); ++i) {
auto op_origin = global_block->Op(i);
auto op_copy = global_block->Op(i);
auto op_copy = global_block_copy->Op(i);
ASSERT_EQ(op_origin->Type(), op_copy->Type());
ASSERT_EQ(op_origin->Inputs(), op_copy->Inputs());
......@@ -131,7 +131,7 @@ TEST(ProgramDescBind, serialize_and_deserialize) {
for (size_t i = 0; i < global_block->OpSize(); ++i) {
auto op_origin = global_block->Op(i);
auto op_restored = global_block->Op(i);
auto op_restored = global_block_restored->Op(i);
ASSERT_EQ(op_origin->Type(), op_restored->Type());
ASSERT_EQ(op_origin->Inputs(), op_restored->Inputs());
......
......@@ -11,8 +11,10 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/tensor_util.h"
#include <algorithm>
#include <limits>
#include <vector>
namespace paddle {
namespace framework {
......@@ -65,8 +67,6 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place,
auto dst_gpu_place = boost::get<platform::CUDAPlace>(dst_place);
auto ctx_place = ctx.GetPlace();
PADDLE_ENFORCE(platform::is_gpu_place(ctx_place));
auto ctx_gpu_place = boost::get<platform::CUDAPlace>(ctx_place);
PADDLE_ENFORCE_EQ(src_gpu_place, ctx_gpu_place);
memory::Copy(
dst_gpu_place, dst_ptr, src_gpu_place, src_ptr, size,
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream());
......
......@@ -14,8 +14,12 @@
#include "paddle/fluid/framework/threadpool.h"
#include "gflags/gflags.h"
#include "paddle/fluid/platform/enforce.h"
DEFINE_int32(io_threadpool_size, 100,
"number of threads used for doing IO, default 100");
namespace paddle {
namespace framework {
......@@ -91,5 +95,20 @@ void ThreadPool::TaskLoop() {
}
}
std::unique_ptr<ThreadPool> ThreadPoolIO::io_threadpool_(nullptr);
std::once_flag ThreadPoolIO::io_init_flag_;
ThreadPool* ThreadPoolIO::GetInstanceIO() {
std::call_once(io_init_flag_, &ThreadPoolIO::InitIO);
return io_threadpool_.get();
}
void ThreadPoolIO::InitIO() {
if (io_threadpool_.get() == nullptr) {
// TODO(typhoonzero1986): make this configurable
io_threadpool_.reset(new ThreadPool(FLAGS_io_threadpool_size));
}
}
} // namespace framework
} // namespace paddle
......@@ -14,12 +14,12 @@ limitations under the License. */
#pragma once
#include <condition_variable>
#include <condition_variable> // NOLINT
#include <functional>
#include <future>
#include <mutex>
#include <future> // NOLINT
#include <mutex> // NOLINT
#include <queue>
#include <thread>
#include <thread> // NOLINT
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -28,6 +28,22 @@ limitations under the License. */
namespace paddle {
namespace framework {
struct ExceptionHandler {
mutable std::future<std::unique_ptr<platform::EnforceNotMet>> future_;
explicit ExceptionHandler(
std::future<std::unique_ptr<platform::EnforceNotMet>>&& f)
: future_(std::move(f)) {}
void operator()() const {
auto ex = this->future_.get();
if (ex != nullptr) {
LOG(FATAL) << "The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.\n"
"The default exception handler is LOG(FATAL)."
<< ex->what();
}
}
};
// ThreadPool maintains a queue of tasks, and runs them using a fixed
// number of threads.
class ThreadPool {
......@@ -87,22 +103,6 @@ class ThreadPool {
void Wait();
private:
struct ExceptionHandler {
mutable std::future<std::unique_ptr<platform::EnforceNotMet>> future_;
explicit ExceptionHandler(
std::future<std::unique_ptr<platform::EnforceNotMet>>&& f)
: future_(std::move(f)) {}
void operator()() const {
auto ex = this->future_.get();
if (ex != nullptr) {
LOG(FATAL) << "The exception is thrown inside the thread pool. You "
"should use RunAndGetException to handle the exception.\n"
"The default exception handler is LOG(FATAL)."
<< ex->what();
}
}
};
DISABLE_COPY_AND_ASSIGN(ThreadPool);
// If the task queue is empty and avaialbe is equal to the number of
......@@ -135,6 +135,17 @@ class ThreadPool {
std::condition_variable completed_;
};
class ThreadPoolIO : ThreadPool {
public:
static ThreadPool* GetInstanceIO();
static void InitIO();
private:
// NOTE: threadpool in base will be inhereted here.
static std::unique_ptr<ThreadPool> io_threadpool_;
static std::once_flag io_init_flag_;
};
// Run a function asynchronously.
// NOTE: The function must return void. If the function need to return a value,
// you can use lambda to capture a value pointer.
......@@ -143,5 +154,10 @@ std::future<void> Async(Callback callback) {
return ThreadPool::GetInstance()->Run(callback);
}
template <typename Callback>
std::future<void> AsyncIO(Callback callback) {
return ThreadPoolIO::GetInstanceIO()->Run(callback);
}
} // namespace framework
} // namespace paddle
......@@ -21,4 +21,7 @@ endif()
if(WITH_TESTING)
add_subdirectory(tests/book)
if (TENSORRT_FOUND)
add_subdirectory(tensorrt)
endif()
endif()
......@@ -23,7 +23,7 @@ limitations under the License. */
namespace paddle {
namespace inference {
// Temporarilly add this function for exposing framework::InitDevices() when
// Temporarily add this function for exposing framework::InitDevices() when
// linking the inference shared library.
void Init(bool init_p2p) { framework::InitDevices(init_p2p); }
......
nv_test(test_tensorrt SRCS test_tensorrt.cc DEPS dynload_cuda device_context dynamic_loader)
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "NvInfer.h"
#include "cuda.h"
#include "cuda_runtime_api.h"
#include "paddle/fluid/platform/dynload/tensorrt.h"
namespace dy = paddle::platform::dynload;
class Logger : public nvinfer1::ILogger {
public:
void log(nvinfer1::ILogger::Severity severity, const char* msg) override {
switch (severity) {
case Severity::kINFO:
LOG(INFO) << msg;
break;
case Severity::kWARNING:
LOG(WARNING) << msg;
break;
case Severity::kINTERNAL_ERROR:
case Severity::kERROR:
LOG(ERROR) << msg;
break;
default:
break;
}
}
};
class ScopedWeights {
public:
ScopedWeights(float value) : value_(value) {
w.type = nvinfer1::DataType::kFLOAT;
w.values = &value_;
w.count = 1;
}
const nvinfer1::Weights& get() { return w; }
private:
float value_;
nvinfer1::Weights w;
};
// The following two API are implemented in TensorRT's header file, cannot load
// from the dynamic library. So create our own implementation and directly
// trigger the method from the dynamic library.
nvinfer1::IBuilder* createInferBuilder(nvinfer1::ILogger& logger) {
return static_cast<nvinfer1::IBuilder*>(
dy::createInferBuilder_INTERNAL(&logger, NV_TENSORRT_VERSION));
}
nvinfer1::IRuntime* createInferRuntime(nvinfer1::ILogger& logger) {
return static_cast<nvinfer1::IRuntime*>(
dy::createInferRuntime_INTERNAL(&logger, NV_TENSORRT_VERSION));
}
const char* kInputTensor = "input";
const char* kOutputTensor = "output";
// Creates a network to compute y = 2x + 3
nvinfer1::IHostMemory* CreateNetwork() {
Logger logger;
// Create the engine.
nvinfer1::IBuilder* builder = createInferBuilder(logger);
ScopedWeights weights(2.);
ScopedWeights bias(3.);
nvinfer1::INetworkDefinition* network = builder->createNetwork();
// Add the input
auto input = network->addInput(kInputTensor, nvinfer1::DataType::kFLOAT,
nvinfer1::DimsCHW{1, 1, 1});
EXPECT_NE(input, nullptr);
// Add the hidden layer.
auto layer = network->addFullyConnected(*input, 1, weights.get(), bias.get());
EXPECT_NE(layer, nullptr);
// Mark the output.
auto output = layer->getOutput(0);
output->setName(kOutputTensor);
network->markOutput(*output);
// Build the engine.
builder->setMaxBatchSize(1);
builder->setMaxWorkspaceSize(1 << 10);
auto engine = builder->buildCudaEngine(*network);
EXPECT_NE(engine, nullptr);
// Serialize the engine to create a model, then close.
nvinfer1::IHostMemory* model = engine->serialize();
network->destroy();
engine->destroy();
builder->destroy();
return model;
}
void Execute(nvinfer1::IExecutionContext& context, const float* input,
float* output) {
const nvinfer1::ICudaEngine& engine = context.getEngine();
// Two binds, input and output
ASSERT_EQ(engine.getNbBindings(), 2);
const int input_index = engine.getBindingIndex(kInputTensor);
const int output_index = engine.getBindingIndex(kOutputTensor);
// Create GPU buffers and a stream
void* buffers[2];
ASSERT_EQ(0, cudaMalloc(&buffers[input_index], sizeof(float)));
ASSERT_EQ(0, cudaMalloc(&buffers[output_index], sizeof(float)));
cudaStream_t stream;
ASSERT_EQ(0, cudaStreamCreate(&stream));
// Copy the input to the GPU, execute the network, and copy the output back.
ASSERT_EQ(0, cudaMemcpyAsync(buffers[input_index], input, sizeof(float),
cudaMemcpyHostToDevice, stream));
context.enqueue(1, buffers, stream, nullptr);
ASSERT_EQ(0, cudaMemcpyAsync(output, buffers[output_index], sizeof(float),
cudaMemcpyDeviceToHost, stream));
cudaStreamSynchronize(stream);
// Release the stream and the buffers
cudaStreamDestroy(stream);
ASSERT_EQ(0, cudaFree(buffers[input_index]));
ASSERT_EQ(0, cudaFree(buffers[output_index]));
}
TEST(TensorrtTest, BasicFunction) {
// Create the network serialized model.
nvinfer1::IHostMemory* model = CreateNetwork();
// Use the model to create an engine and an execution context.
Logger logger;
nvinfer1::IRuntime* runtime = createInferRuntime(logger);
nvinfer1::ICudaEngine* engine =
runtime->deserializeCudaEngine(model->data(), model->size(), nullptr);
model->destroy();
nvinfer1::IExecutionContext* context = engine->createExecutionContext();
// Execute the network.
float input = 1234;
float output;
Execute(*context, &input, &output);
EXPECT_EQ(output, input * 2 + 3);
// Destroy the engine.
context->destroy();
engine->destroy();
runtime->destroy();
}
......@@ -46,8 +46,8 @@ TEST(inference, image_classification) {
// Run inference on CPU
LOG(INFO) << "--- CPU Runs: ---";
TestInference<paddle::platform::CPUPlace, false>(dirname, cpu_feeds,
cpu_fetchs1, FLAGS_repeat);
TestInference<paddle::platform::CPUPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat);
LOG(INFO) << output1.dims();
#ifdef PADDLE_WITH_CUDA
......@@ -57,8 +57,8 @@ TEST(inference, image_classification) {
// Run inference on CUDA GPU
LOG(INFO) << "--- GPU Runs: ---";
TestInference<paddle::platform::CUDAPlace, false>(dirname, cpu_feeds,
cpu_fetchs2, FLAGS_repeat);
TestInference<paddle::platform::CUDAPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs2, FLAGS_repeat);
LOG(INFO) << output2.dims();
CheckError<float>(output1, output2);
......
......@@ -89,7 +89,7 @@ void CheckError(const paddle::framework::LoDTensor& output1,
EXPECT_EQ(count, 0U) << "There are " << count << " different elements.";
}
template <typename Place, bool CreateVars = true>
template <typename Place, bool CreateVars = true, bool PrepareContext = false>
void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
const std::vector<paddle::framework::LoDTensor*>& cpu_fetchs,
......@@ -175,8 +175,15 @@ void TestInference(const std::string& dirname,
}
// Ignore the profiling results of the first run
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
if (PrepareContext) {
ctx = executor.Prepare(*inference_program, 0);
executor.RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets,
CreateVars);
} else {
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
}
// Enable the profiler
paddle::platform::EnableProfiler(state);
......@@ -187,8 +194,15 @@ void TestInference(const std::string& dirname,
"run_inference",
paddle::platform::DeviceContextPool::Instance().Get(place));
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
if (PrepareContext) {
// Note: if you change the inference_program, you need to call
// executor.Prepare() again to get a new ExecutorPrepareContext.
executor.RunPreparedContext(ctx.get(), scope, feed_targets,
fetch_targets, CreateVars);
} else {
executor.Run(*inference_program, scope, feed_targets, fetch_targets,
CreateVars);
}
}
// Disable the profiler and print the timing information
......
......@@ -25,12 +25,14 @@ void GetAccumulators<paddle::platform::CUDADeviceContext>(
auto* in_num_accumulates = ctx.Input<Tensor>("in_num_accumulates");
auto* in_num_updates = ctx.Input<Tensor>("in_num_updates");
auto stream = ctx.cuda_device_context().stream();
memory::Copy(platform::CPUPlace(), old_num_accumulates_,
platform::CUDAPlace(), in_old_num_accumulates->data<int64_t>(),
sizeof(int64_t), stream);
memory::Copy(platform::CPUPlace(), num_accumulates_, platform::CUDAPlace(),
auto cuda_place =
boost::get<platform::CUDAPlace>(in_old_num_accumulates->place());
memory::Copy(platform::CPUPlace(), old_num_accumulates_, cuda_place,
in_old_num_accumulates->data<int64_t>(), sizeof(int64_t),
stream);
memory::Copy(platform::CPUPlace(), num_accumulates_, cuda_place,
in_num_accumulates->data<int64_t>(), sizeof(int64_t), stream);
memory::Copy(platform::CPUPlace(), num_updates_, platform::CUDAPlace(),
memory::Copy(platform::CPUPlace(), num_updates_, cuda_place,
in_num_updates->data<int64_t>(), sizeof(int64_t), stream);
}
......@@ -42,14 +44,16 @@ void SetAccumulators<paddle::platform::CUDADeviceContext>(
auto* out_old_num_accumulates = ctx.Output<Tensor>("out_old_num_accumulates");
auto* out_num_accumulates = ctx.Output<Tensor>("out_num_accumulates");
auto* out_num_updates = ctx.Output<Tensor>("out_num_updates");
auto cuda_place =
boost::get<platform::CUDAPlace>(out_old_num_accumulates->place());
memory::Copy(platform::CUDAPlace(), out_old_num_accumulates->data<int64_t>(),
memory::Copy(cuda_place, out_old_num_accumulates->data<int64_t>(),
platform::CPUPlace(), &old_num_accumulates_, sizeof(int64_t),
stream);
memory::Copy(platform::CUDAPlace(), out_num_accumulates->data<int64_t>(),
memory::Copy(cuda_place, out_num_accumulates->data<int64_t>(),
platform::CPUPlace(), &num_accumulates_, sizeof(int64_t),
stream);
memory::Copy(platform::CUDAPlace(), out_num_updates->data<int64_t>(),
memory::Copy(cuda_place, out_num_updates->data<int64_t>(),
platform::CPUPlace(), &num_updates_, sizeof(int64_t), stream);
}
......
......@@ -35,7 +35,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] {
framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch,
this] {
auto* var = p_scope->FindVar(var_name_val);
::grpc::ByteBuffer req;
......@@ -89,7 +90,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] {
framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch,
this] {
// prepare input
sendrecv::VariableMessage req;
req.set_varname(var_name_val);
......@@ -132,8 +134,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep,
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
time_out, ch, this] {
framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
time_out, ch, this] {
auto* var = p_scope->FindVar(in_var_name_val);
::grpc::ByteBuffer req;
......@@ -196,7 +198,7 @@ bool RPCClient::Wait() {
std::vector<std::future<void>> waits(req_count_);
for (int i = 0; i < req_count_; i++) {
waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); });
waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); });
}
for (int i = 0; i < req_count_; i++) {
......
......@@ -217,10 +217,10 @@ void AsyncGRPCServer::RunSyncUpdate() {
std::function<void()> prefetch_register =
std::bind(&AsyncGRPCServer::TryToRegisterNewPrefetchOne, this);
// TODO(wuyi): Run these "HandleRequest" in thread pool
t_send_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_send_.get(), "cq_send", send_register)));
t_get_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_get_.get(), "cq_get", get_register)));
......
......@@ -268,6 +268,7 @@ void batched_gemm<platform::CUDADeviceContext, float16>(
const CBLAS_TRANSPOSE transB, const int M, const int N, const int K,
const float16 alpha, const float16* A, const float16* B, const float16 beta,
float16* C, const int batchCount, const int strideA, const int strideB) {
#if CUDA_VERSION >= 8000
// Note that cublas follows fortran order, so the order is different from
// the cblas convention.
int lda = (transA == CblasNoTrans) ? K : M;
......@@ -289,7 +290,6 @@ void batched_gemm<platform::CUDADeviceContext, float16>(
PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53,
"cublas Hgemm requires GPU compute capability >= 53");
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasHgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, h_B, ldb,
strideB, h_A, lda, strideA, &h_beta, h_C, ldc, strideC, batchCount));
......@@ -304,6 +304,7 @@ void batched_gemm<platform::CUDADeviceContext, float>(
const CBLAS_TRANSPOSE transB, const int M, const int N, const int K,
const float alpha, const float* A, const float* B, const float beta,
float* C, const int batchCount, const int strideA, const int strideB) {
#if CUDA_VERSION >= 8000
// Note that cublas follows fortran order, so the order is different from
// the cblas convention.
int lda = (transA == CblasNoTrans) ? K : M;
......@@ -315,7 +316,6 @@ void batched_gemm<platform::CUDADeviceContext, float>(
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
const int strideC = M * N;
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasSgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &alpha, B, ldb,
strideB, A, lda, strideA, &beta, C, ldc, strideC, batchCount));
......@@ -330,6 +330,7 @@ void batched_gemm<platform::CUDADeviceContext, double>(
const CBLAS_TRANSPOSE transB, const int M, const int N, const int K,
const double alpha, const double* A, const double* B, const double beta,
double* C, const int batchCount, const int strideA, const int strideB) {
#if CUDA_VERSION >= 8000
// Note that cublas follows fortran order, so the order is different from
// the cblas convention.
int lda = (transA == CblasNoTrans) ? K : M;
......@@ -341,7 +342,6 @@ void batched_gemm<platform::CUDADeviceContext, double>(
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
const int strideC = M * N;
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasDgemmStridedBatched(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &alpha, B, ldb,
strideB, A, lda, strideA, &beta, C, ldc, strideC, batchCount));
......
......@@ -33,28 +33,14 @@ static constexpr size_t kChannelSize = 0; // kCacheSize - 2
class DoubleBufferReader : public framework::DecoratedReader {
public:
struct Item {
Item() : ctx_(nullptr) {}
Item(Item&& b) {
payloads_ = std::move(b.payloads_);
ctx_ = std::move(b.ctx_);
}
Item& operator=(Item&& b) {
payloads_ = std::move(b.payloads_);
ctx_ = std::move(b.ctx_);
return *this;
}
std::vector<framework::LoDTensor> payloads_;
platform::DeviceContext* ctx_;
};
explicit DoubleBufferReader(
ReaderBase* reader, platform::Place target_place = platform::CPUPlace())
: DecoratedReader(reader), place_(target_place) {
cpu_tensor_cache_.resize(kCacheSize);
gpu_tensor_cache_.resize(kCacheSize);
#ifdef PADDLE_WITH_CUDA
for (size_t i = 0; i < kCacheSize; ++i) {
if (platform::is_gpu_place(place_)) {
if (platform::is_gpu_place(place_)) {
for (size_t i = 0; i < kCacheSize; ++i) {
ctxs_.emplace_back(new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(place_)));
}
......@@ -72,7 +58,7 @@ class DoubleBufferReader : public framework::DecoratedReader {
bool HasNext() const;
void StartPrefetcher() {
channel_ = framework::MakeChannel<Item>(kChannelSize);
channel_ = framework::MakeChannel<size_t>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
}
......@@ -88,8 +74,10 @@ class DoubleBufferReader : public framework::DecoratedReader {
void PrefetchThreadFunc();
std::thread prefetcher_;
framework::Channel<Item>* channel_;
framework::Channel<size_t>* channel_;
platform::Place place_;
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache_;
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache_;
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
};
......@@ -153,11 +141,14 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
out->clear();
if (HasNext()) {
Item batch;
channel_->Receive(&batch);
*out = batch.payloads_;
if (batch.ctx_) {
batch.ctx_->Wait();
size_t cached_tensor_id;
channel_->Receive(&cached_tensor_id);
if (platform::is_gpu_place(place_)) {
*out = gpu_tensor_cache_[cached_tensor_id];
ctxs_[cached_tensor_id]->Wait();
} else {
// CPU place
*out = cpu_tensor_cache_[cached_tensor_id];
}
}
}
......@@ -176,42 +167,33 @@ bool DoubleBufferReader::HasNext() const {
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(kCacheSize);
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(kCacheSize);
size_t cached_tensor_id = 0;
while (true) {
Item batch;
auto& cpu_batch = cpu_tensor_cache[cached_tensor_id];
auto& cpu_batch = cpu_tensor_cache_[cached_tensor_id];
reader_->ReadNext(&cpu_batch);
if (cpu_batch.empty()) {
// The underlying reader have no next data.
break;
}
if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache[cached_tensor_id];
auto& gpu_batch = gpu_tensor_cache_[cached_tensor_id];
auto* gpu_ctx = ctxs_[cached_tensor_id].get();
gpu_batch.resize(cpu_batch.size());
for (size_t i = 0; i < cpu_batch.size(); ++i) {
framework::TensorCopy(cpu_batch[i], place_, *gpu_ctx, &gpu_batch[i]);
gpu_batch[i].set_lod(cpu_batch[i].lod());
}
batch.payloads_ = gpu_batch;
batch.ctx_ = gpu_ctx;
} else {
// CPUPlace
batch.payloads_ = cpu_batch;
}
++cached_tensor_id;
cached_tensor_id %= kCacheSize;
try {
channel_->Send(&batch);
size_t tmp = cached_tensor_id;
channel_->Send(&tmp);
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate.";
break;
}
++cached_tensor_id;
cached_tensor_id %= kCacheSize;
}
channel_->Close();
VLOG(5) << "Prefetch thread terminates.";
......
......@@ -147,6 +147,7 @@ class ReshapeKernel : public framework::OpKernel<T> {
if (!inplace) {
out->mutable_data<T>(ctx.GetPlace());
framework::TensorCopy(*in, ctx.GetPlace(), ctx.device_context(), out);
ctx.device_context().Wait();
// TensorCopy will resize to in_dims.
out->Resize(out_dims);
} else {
......@@ -169,6 +170,7 @@ class ReshapeGradKernel : public framework::OpKernel<T> {
auto in_dims = d_x->dims();
if (!inplace) {
framework::TensorCopy(*d_out, ctx.GetPlace(), ctx.device_context(), d_x);
ctx.device_context().Wait();
d_x->Resize(in_dims);
} else {
d_x->ShareDataWith(*d_out);
......
......@@ -13,7 +13,6 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/scale_op.h"
#include <string>
namespace paddle {
......
......@@ -12,9 +12,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "gather.cu.h"
#include "paddle/fluid/operators/gather.cu.h"
#include "paddle/fluid/operators/gather_op.h"
#include "scatter.cu.h"
#include "paddle/fluid/operators/scatter.cu.h"
#include "paddle/fluid/operators/scatter_op.h"
namespace paddle {
namespace operators {
......
......@@ -13,10 +13,10 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "gather.h"
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "scatter.h"
#include "paddle/fluid/operators/gather.h"
#include "paddle/fluid/operators/scatter.h"
namespace paddle {
namespace operators {
......
......@@ -13,44 +13,48 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/scatter.h"
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/place.h"
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/place.h"
TEST(scatter, ScatterUpdate) {
using namespace paddle::framework;
using namespace paddle::platform;
using namespace paddle::operators;
// using namespace paddle::framework;
// using namespace paddle::platform;
// using namespace paddle::operators;
Tensor* src = new Tensor();
Tensor* index = new Tensor();
Tensor* output = new Tensor();
paddle::framework::Tensor* src = new paddle::framework::Tensor();
paddle::framework::Tensor* index = new paddle::framework::Tensor();
paddle::framework::Tensor* output = new paddle::framework::Tensor();
float* p_src = nullptr;
int* p_index = nullptr;
p_src = src->mutable_data<float>(make_ddim({1, 4}), CPUPlace());
p_index = index->mutable_data<int>(make_ddim({1}), CPUPlace());
p_src = src->mutable_data<float>(paddle::framework::make_ddim({1, 4}),
paddle::platform::CPUPlace());
p_index = index->mutable_data<int>(paddle::framework::make_ddim({1}),
paddle::platform::CPUPlace());
for (size_t i = 0; i < 4; ++i) p_src[i] = float(i);
for (size_t i = 0; i < 4; ++i) p_src[i] = static_cast<float>(i);
p_index[0] = 1;
float* p_output = output->mutable_data<float>(make_ddim({4, 4}), CPUPlace());
float* p_output = output->mutable_data<float>(
paddle::framework::make_ddim({4, 4}), paddle::platform::CPUPlace());
auto* cpu_place = new paddle::platform::CPUPlace();
paddle::platform::CPUDeviceContext ctx(*cpu_place);
ScatterAssign<float>(ctx, *src, *index, output);
paddle::operators::ScatterAssign<float>(ctx, *src, *index, output);
for (size_t i = 0; i < 4; ++i) EXPECT_EQ(p_output[i], float(0));
for (size_t i = 0; i < 4; ++i) EXPECT_EQ(output->data<float>()[i], float(0));
for (size_t i = 4; i < 8; ++i) EXPECT_EQ(p_output[i], float(i - 4));
for (size_t i = 0; i < 4; ++i) EXPECT_EQ(p_output[i], 0.0f);
for (size_t i = 0; i < 4; ++i) EXPECT_EQ(output->data<float>()[i], 0.0f);
for (size_t i = 4; i < 8; ++i) {
EXPECT_EQ(p_output[i], static_cast<float>(i - 4));
}
for (size_t i = 4; i < 8; ++i)
EXPECT_EQ(output->data<float>()[i], float(i - 4));
for (size_t i = 8; i < 16; ++i) EXPECT_EQ(p_output[i], float(0));
for (size_t i = 8; i < 16; ++i) EXPECT_EQ(output->data<float>()[i], float(0));
EXPECT_EQ(output->data<float>()[i], static_cast<float>(i - 4));
for (size_t i = 8; i < 16; ++i) EXPECT_EQ(p_output[i], 0.0f);
for (size_t i = 8; i < 16; ++i) EXPECT_EQ(output->data<float>()[i], 0.0f);
delete src;
delete index;
......
......@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
......@@ -19,7 +20,6 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include <future>
#include "paddle/fluid/operators/detail/grpc_client.h"
namespace paddle {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future>
#include <future> // NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
......
......@@ -12,6 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <string>
namespace paddle {
namespace operators {
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/strided_memcpy.h"
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <algorithm>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/context_project.h"
#include "paddle/fluid/operators/math/math_function.h"
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/sequence_erase_op.h"
#include <vector>
namespace paddle {
namespace operators {
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
namespace paddle {
......
......@@ -84,12 +84,11 @@ class SequenceExpandOp : public framework::OperatorWithKernel {
}
}
out_dims[0] = out_first_dim;
ctx->SetOutputDim("Out", out_dims);
} else {
out_dims[0] = -1;
ctx->SetOutputDim("Out", out_dims);
ctx->ShareLoD("X", /*->*/ "Out");
}
ctx->SetOutputDim("Out", out_dims);
ctx->ShareLoD("X", /*->*/ "Out");
}
};
......
......@@ -12,8 +12,135 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#define EIGEN_USE_GPU
#include <algorithm>
#include "paddle/fluid/operators/sequence_expand_op.h"
#include "paddle/fluid/platform/cuda_helper.h"
namespace paddle {
namespace operators {
using LoDTensor = framework::LoDTensor;
template <typename T>
__global__ void sequence_expand_kernel(const T* x_data, const size_t* x_lod,
const size_t* ref_lod,
const size_t* offset,
const size_t lod_size,
/* default=1,
the instance length*/
const int x_item_length, T* out_data) {
int bid = blockIdx.x;
if (bid >= lod_size - 1) return;
int x_item_count = x_lod[bid + 1] - x_lod[bid];
int repeats = ref_lod[bid + 1] - ref_lod[bid];
int out_offset = static_cast<int>(offset[bid]);
int x_offset = x_lod[bid];
for (int tid_z = threadIdx.z; tid_z < repeats; tid_z += blockDim.z) {
for (int tid_y = threadIdx.y; tid_y < x_item_count; tid_y += blockDim.y) {
for (int tid_x = threadIdx.x; tid_x < x_item_length;
tid_x += blockDim.x) {
out_data[(out_offset + tid_z * x_item_count + tid_y) * x_item_length +
tid_x] = x_data[(x_offset + tid_y) * x_item_length + tid_x];
}
}
}
}
template <typename T>
__global__ void sequence_expand_grad_kernel(
const T* dout_data, const size_t* ref_lod, const size_t* dx_lod,
const size_t* offset, const size_t lod_size,
/* default=1,
the instance length*/
const int x_item_length, T* dx_data) {
int bid = blockIdx.x;
if (bid >= lod_size - 1) return;
int x_item_count = dx_lod[bid + 1] - dx_lod[bid];
int repeats = ref_lod[bid + 1] - ref_lod[bid];
int out_offset = static_cast<int>(offset[bid]);
int x_offset = dx_lod[bid];
for (int tid_z = threadIdx.z; tid_z < repeats; tid_z += blockDim.z) {
for (int tid_y = threadIdx.y; tid_y < x_item_count; tid_y += blockDim.y) {
for (int tid_x = threadIdx.x; tid_x < x_item_length;
tid_x += blockDim.x) {
platform::CudaAtomicAdd(
&dx_data[(x_offset + tid_y) * x_item_length + tid_x],
dout_data[(out_offset + tid_z * x_item_count + tid_y) *
x_item_length +
tid_x]);
}
}
}
}
void GetOutputOffset(const framework::Vector<size_t>& x_lod,
const framework::Vector<size_t>& ref_lod,
framework::Vector<size_t>* out_offset) {
size_t offset = 0;
int lod_size = static_cast<int>(x_lod.size());
for (int i = 0; i < static_cast<int>(x_lod.size()); ++i) {
(*out_offset)[i] = offset;
if (i < lod_size - 1) {
offset += (ref_lod[i + 1] - ref_lod[i]) * (x_lod[i + 1] - x_lod[i]);
}
}
}
template <typename T>
struct SequenceExpandFunctor<platform::CUDADeviceContext, T> {
void operator()(
const platform::CUDADeviceContext& context, const LoDTensor& x,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand referenced lod*/
LoDTensor* out) {
int x_item_length = x.numel() / x.dims()[0];
framework::Vector<size_t> out_offset(x_lod.size());
GetOutputOffset(x_lod, ref_lod, &out_offset);
int thread_x = std::min(32, std::max(static_cast<int>(ref_lod.size()), 16));
int thread_y = 16;
int thread_z = 1024 / thread_x / thread_y;
int block_x = static_cast<int>(ref_lod.size());
dim3 block_size(thread_x, thread_y, thread_z);
dim3 grid_size(block_x, 1);
sequence_expand_kernel<<<grid_size, block_size, 0, context.stream()>>>(
x.data<T>(), x_lod.CUDAData(context.GetPlace()),
ref_lod.CUDAData(context.GetPlace()),
out_offset.CUDAData(context.GetPlace()), x_lod.size(), x_item_length,
out->mutable_data<T>(context.GetPlace()));
}
};
template <typename T>
struct SequenceExpandGradFunctor<platform::CUDADeviceContext, T> {
void operator()(const platform::CUDADeviceContext& context,
const LoDTensor& dout,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand based lod*/
LoDTensor* dx) {
int x_item_length = framework::product(dx->dims()) / dx->dims()[0];
framework::Vector<size_t> out_offset(x_lod.size());
GetOutputOffset(x_lod, ref_lod, &out_offset);
int thread_x = std::min(32, std::max(static_cast<int>(ref_lod.size()), 16));
int thread_y = 16;
int thread_z = 1024 / thread_x / thread_y;
int block_x = static_cast<int>(ref_lod.size());
dim3 block_size(thread_x, thread_y, thread_z);
dim3 grid_size(block_x, 1);
sequence_expand_grad_kernel<<<grid_size, block_size, 0, context.stream()>>>(
dout.data<T>(), ref_lod.CUDAData(context.GetPlace()),
x_lod.CUDAData(context.GetPlace()),
out_offset.CUDAData(context.GetPlace()), ref_lod.size(), x_item_length,
dx->mutable_data<T>(context.GetPlace()));
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <numeric> // std::iota
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/memory/memcpy.h"
......@@ -26,6 +27,57 @@ template <typename T, int MajorType = Eigen::RowMajor,
typename IndexType = Eigen::DenseIndex>
using EigenMatrix = framework::EigenMatrix<T, MajorType, IndexType>;
template <typename DeviceContext, typename T>
struct SequenceExpandFunctor {
void operator()(
const DeviceContext& ctx, const LoDTensor& x,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand referenced lod*/
LoDTensor* out);
};
template <typename DeviceContext, typename T>
struct SequenceExpandGradFunctor {
void operator()(
const DeviceContext& ctx, const LoDTensor& dout,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand referenced lod*/
LoDTensor* dx);
};
template <typename T>
struct SequenceExpandFunctor<platform::CPUDeviceContext, T> {
void operator()(
const platform::CPUDeviceContext& context, const LoDTensor& x,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand referenced lod*/
LoDTensor* out) {
int out_offset = 0;
auto& eigen_place = *context.eigen_device();
for (size_t i = 1; i < ref_lod.size(); ++i) {
int repeat_num = ref_lod[i] - ref_lod[i - 1];
int x_start = x_lod[i - 1];
int x_end = x_lod[i];
int x_seq_len = x_end - x_start;
if (repeat_num > 0) {
auto x_sub_tensor = x.Slice(x_start, x_end);
x_sub_tensor.Resize({1, x_sub_tensor.numel()});
int out_start = out_offset;
if (out->lod().size() == 1) {
out_start = out->lod()[0][out_offset];
}
auto out_sub_tensor =
out->Slice(out_start, out_start + x_seq_len * repeat_num);
out_sub_tensor.Resize({repeat_num, x_sub_tensor.dims()[1]});
EigenMatrix<T>::From(out_sub_tensor).device(eigen_place) =
EigenMatrix<T>::From(x_sub_tensor)
.broadcast(Eigen::array<int, 2>({{repeat_num, 1}}));
}
out_offset += repeat_num;
}
}
};
template <typename DeviceContext, typename T>
class SequenceExpandKernel : public framework::OpKernel<T> {
public:
......@@ -47,45 +99,36 @@ class SequenceExpandKernel : public framework::OpKernel<T> {
return;
}
auto& out_lod = *out->mutable_lod();
// x lod level is at most 1.
framework::Vector<size_t> out_lod;
if (x_lod.size() == 1) {
out_lod.resize(1);
out_lod[0] = {0};
}
int out_offset = 0;
auto& eigen_place =
*context.template device_context<DeviceContext>().eigen_device();
for (size_t i = 1; i < y_lod[ref_level].size(); ++i) {
int repeat_num = y_lod[ref_level][i] - y_lod[ref_level][i - 1];
int x_start = i - 1;
int x_end = i;
if (x_lod.size() == 1) {
x_start = x_lod[0][i - 1];
x_end = x_lod[0][i];
}
int x_seq_len = x_end - x_start;
if (repeat_num > 0) {
auto x_sub_tensor = x->Slice(x_start, x_end);
x_sub_tensor.Resize({1, x_sub_tensor.numel()});
int out_start = out_offset;
if (x_lod.size() == 1) {
out_start = out_lod[0][out_offset];
}
auto out_sub_tensor =
out->Slice(out_start, out_start + x_seq_len * repeat_num);
out_sub_tensor.Resize({repeat_num, x_sub_tensor.dims()[1]});
EigenMatrix<T>::From(out_sub_tensor).device(eigen_place) =
EigenMatrix<T>::From(x_sub_tensor)
.broadcast(Eigen::array<int, 2>({{repeat_num, 1}}));
}
for (int j = 0; j < repeat_num; ++j) {
if (x_lod.size() == 1) {
out_lod[0].push_back(out_lod[0].back() + x_seq_len);
out_lod.push_back(0);
int out_offset = 0;
for (size_t i = 1; i < y_lod[ref_level].size(); ++i) {
int repeat_num = y_lod[ref_level][i] - y_lod[ref_level][i - 1];
int x_start = x_lod[0][i - 1];
int x_end = x_lod[0][i];
int x_seq_len = x_end - x_start;
for (int j = 0; j < repeat_num; ++j) {
out_lod.push_back(out_lod.back() + x_seq_len);
out_offset++;
}
out_offset++;
}
// write lod to out if x has lod
auto& ref_lod = *out->mutable_lod();
ref_lod[0] = out_lod;
}
framework::Vector<size_t> ref_x_lod;
if (x->lod().size() == 1) {
ref_x_lod = x->lod()[0];
} else {
// x_lod doesn't has lod, use fake x lod, level = 0
ref_x_lod.resize(x->dims()[0] + 1);
std::iota(ref_x_lod.begin(), ref_x_lod.end(), 0);
}
SequenceExpandFunctor<DeviceContext, T> functor;
functor(context.template device_context<DeviceContext>(), *x, ref_x_lod,
y_lod[ref_level], out);
}
};
......@@ -101,6 +144,36 @@ class SequenceExpandKernel : public framework::OpKernel<T> {
* Grad(X).lod = Input(X).lod
*
* */
template <typename T>
struct SequenceExpandGradFunctor<platform::CPUDeviceContext, T> {
void operator()(
const platform::CPUDeviceContext& context, const LoDTensor& dout,
const framework::Vector<size_t>& x_lod, /*expand source lod*/
const framework::Vector<size_t>& ref_lod, /*expand referenced lod*/
LoDTensor* dx) {
math::SetConstant<platform::CPUDeviceContext, T> set_zero;
set_zero(context, dx, static_cast<T>(0));
int dout_offset = 0;
for (size_t i = 1; i < ref_lod.size(); ++i) {
int repeat_num = ref_lod[i] - ref_lod[i - 1];
if (repeat_num > 0) {
int x_start = x_lod[i - 1];
int x_end = x_lod[i];
int x_seq_len = x_end - x_start;
auto dx_sub = dx->Slice(x_start, x_end);
dx_sub.Resize(flatten_to_1d(dx_sub.dims()));
int dout_end = dout_offset + repeat_num * x_seq_len;
auto dout_sub = dout.Slice(dout_offset, dout_end);
dout_sub.Resize({repeat_num, dx_sub.dims()[0]});
math::ColwiseSum<platform::CPUDeviceContext, T> col_sum;
col_sum(context, dout_sub, &dx_sub);
dout_offset += repeat_num * x_seq_len;
}
}
}
};
template <typename DeviceContext, typename T>
class SequenceExpandGradKernel : public framework::OpKernel<T> {
public:
......@@ -114,43 +187,26 @@ class SequenceExpandGradKernel : public framework::OpKernel<T> {
g_x->mutable_data<T>(context.GetPlace());
g_x->set_lod(x->lod());
auto& x_lod = x->lod();
auto& y_lod = y->lod();
if (ref_level == -1) ref_level = y_lod.size() - 1;
// just copy the gradient
if (y_lod[ref_level].size() <= 1) {
framework::TensorCopy(*g_out, context.GetPlace(), g_x);
return;
}
auto& dev_ctx = context.template device_context<DeviceContext>();
math::SetConstant<DeviceContext, T> set_zero;
set_zero(dev_ctx, g_x, static_cast<T>(0));
int g_out_offset = 0;
for (size_t i = 1; i < y_lod[ref_level].size(); ++i) {
int repeat_num = y_lod[ref_level][i] - y_lod[ref_level][i - 1];
if (repeat_num > 0) {
int x_start = i - 1;
int x_end = i;
if (x_lod.size() == 1) {
x_start = x_lod[0][i - 1];
x_end = x_lod[0][i];
}
int x_seq_len = x_end - x_start;
auto g_x_sub = g_x->Slice(x_start, x_end);
g_x_sub.Resize(flatten_to_1d(g_x_sub.dims()));
int g_out_end = g_out_offset + repeat_num * x_seq_len;
auto g_out_sub = g_out->Slice(g_out_offset, g_out_end);
g_out_sub.Resize({repeat_num, g_x_sub.dims()[0]});
math::ColwiseSum<DeviceContext, T> col_sum;
col_sum(dev_ctx, g_out_sub, &g_x_sub);
g_out_offset += repeat_num * x_seq_len;
}
framework::Vector<size_t> ref_x_lod;
framework::Vector<size_t> ref_lod = y_lod[ref_level];
if (x->lod().size() == 1) {
ref_x_lod = x->lod()[0];
} else {
// x_lod doesn't has lod, use fake x lod, level = 0
ref_x_lod.resize(x->dims()[0] + 1);
std::iota(ref_x_lod.begin(), ref_x_lod.end(), 0);
}
SequenceExpandGradFunctor<DeviceContext, T> functor;
functor(context.template device_context<DeviceContext>(), *g_out, ref_x_lod,
ref_lod, g_x);
}
};
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/sequence_pool_op.h"
#include <string>
namespace paddle {
namespace operators {
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <string>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/sequence_softmax_op.h"
#include <string>
namespace paddle {
namespace operators {
......
......@@ -65,7 +65,8 @@ class SGDOpKernel : public framework::OpKernel<T> {
auto &grad_rows = grad->rows();
size_t grad_row_numel = grad_value.numel() / grad_rows.size();
PADDLE_ENFORCE_EQ(grad_row_numel, param_out->numel() / grad_height);
PADDLE_ENFORCE_EQ(static_cast<int64_t>(grad_row_numel),
param_out->numel() / grad_height);
auto *grad_data = grad_value.data<T>();
auto *out_data = param_out->data<T>();
......@@ -73,7 +74,7 @@ class SGDOpKernel : public framework::OpKernel<T> {
for (size_t i = 0; i < grad_rows.size(); i++) {
PADDLE_ENFORCE(grad_rows[i] < grad_height,
"Input rows index should less than height");
for (int64_t j = 0; j < grad_row_numel; j++) {
for (size_t j = 0; j < grad_row_numel; j++) {
out_data[grad_rows[i] * grad_row_numel + j] -=
lr[0] * grad_data[i * grad_row_numel + j];
}
......@@ -107,7 +108,7 @@ class SGDOpKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE(grad.rows()[i] < grad.height(),
"Input rows index should less than height");
int64_t id_index = param.index(grad.rows()[i]);
for (int64_t j = 0; j < grad_row_width; j++) {
for (size_t j = 0; j < grad_row_width; j++) {
out_data[id_index * grad_row_width + j] -=
lr[0] * grad_data[i * grad_row_width + j];
}
......
......@@ -12,12 +12,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <iostream>
#include "mkldnn.hpp"
#include "paddle/fluid/operators/softmax_op.h"
#include "paddle/fluid/platform/mkldnn_helper.h"
#include <iostream>
namespace paddle {
namespace operators {
......@@ -63,9 +62,11 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> {
softmax_md, 1 /*dim: C*/);
// create memory primitives
auto softmax_src_memory =
memory({softmax_md, mkldnn_engine}, (void*)input_data);
memory({softmax_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(input_data)));
auto softmax_dst_memory =
memory({softmax_md, mkldnn_engine}, (void*)output_data);
memory({softmax_md, mkldnn_engine},
static_cast<void*>(const_cast<T*>(output_data)));
auto softmax_prim_desc =
softmax_forward::primitive_desc(softmax_desc, mkldnn_engine);
auto softmax = softmax_forward(softmax_prim_desc, softmax_src_memory,
......
......@@ -60,7 +60,9 @@ class SplitIdsOpKernel : public framework::OpKernel<T> {
} else if (ids_var->IsType<framework::SelectedRows>()) {
const auto *ids_selected_rows = ctx.Input<framework::SelectedRows>("Ids");
auto &ids_dims = ids_selected_rows->value().dims();
PADDLE_ENFORCE_EQ(ids_dims[0], ids_selected_rows->rows().size(), "");
PADDLE_ENFORCE_EQ(ids_dims[0],
static_cast<int64_t>(ids_selected_rows->rows().size()),
"");
const T *ids = ids_selected_rows->value().data<T>();
const auto &ids_rows = ids_selected_rows->rows();
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");
......@@ -77,7 +79,7 @@ class SplitIdsOpKernel : public framework::OpKernel<T> {
framework::DDim ddim = framework::make_ddim(
{static_cast<int64_t>(out->rows().size()), row_width});
T *output = out->mutable_value()->mutable_data<T>(ddim, place);
for (size_t i = 0; i < ddim[0]; ++i) {
for (int64_t i = 0; i < ddim[0]; ++i) {
memcpy(output + i * row_width, ids + out->rows()[i] * row_width,
row_width * sizeof(T));
}
......
......@@ -14,7 +14,7 @@ limitations under the License. */
#pragma once
#include <chrono>
#include <chrono> // NOLINT
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/strided_memcpy.h"
......
......@@ -24,7 +24,19 @@ template <typename T>
class CPUUniformRandomKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto* tensor = ctx.Output<framework::Tensor>("Out");
framework::Tensor* tensor = nullptr;
auto out_var = ctx.OutputVar("Out");
if (out_var->IsType<framework::LoDTensor>()) {
tensor = out_var->GetMutable<framework::LoDTensor>();
} else if (out_var->IsType<framework::SelectedRows>()) {
auto shape = ctx.Attr<std::vector<int>>("shape");
tensor = out_var->GetMutable<framework::SelectedRows>()->mutable_value();
tensor->Resize(framework::make_ddim(shape));
} else {
PADDLE_THROW(
"uniform_random_op's output only"
"supports SelectedRows and Tensor");
}
T* data = tensor->mutable_data<T>(ctx.GetPlace());
unsigned int seed = static_cast<unsigned int>(ctx.Attr<int>("seed"));
std::minstd_rand engine;
......
......@@ -43,7 +43,19 @@ template <typename T>
class GPUUniformRandomKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
auto* tensor = context.Output<framework::Tensor>("Out");
framework::Tensor* tensor = nullptr;
auto out_var = context.OutputVar("Out");
if (out_var->IsType<framework::LoDTensor>()) {
tensor = out_var->GetMutable<framework::LoDTensor>();
} else if (out_var->IsType<framework::SelectedRows>()) {
auto shape = context.Attr<std::vector<int>>("shape");
tensor = out_var->GetMutable<framework::SelectedRows>()->mutable_value();
tensor->Resize(framework::make_ddim(shape));
} else {
PADDLE_THROW(
"uniform_random_op's output only"
"supports SelectedRows and Tensor");
}
T* data = tensor->mutable_data<T>(context.GetPlace());
unsigned int seed = static_cast<unsigned int>(context.Attr<int>("seed"));
if (seed == 0) {
......
......@@ -11,12 +11,13 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <cuda_profiler_api.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace platform {
......
......@@ -18,16 +18,22 @@ limitations under the License. */
#error device_ptr_cast must be include by .cu file
#endif
#include <thrust/device_ptr.h>
#include <type_traits> // For std::remove_pointer and std::is_pointer.
#include "thrust/device_ptr.h"
namespace paddle {
namespace platform {
namespace details {
// PointerToThrustDevicePtr has two speicalizations, one casts a (CUDA
// device) pointer into thrust::device_ptr, the other keeps rest types
// un-casted.
template <typename T, bool is_ptr>
struct DevicePtrCast;
struct PointerToThrustDevicePtr;
template <typename T>
struct DevicePtrCast<T, true> {
struct PointerToThrustDevicePtr<T, true> {
using ELEM = typename std::remove_pointer<T>::type;
using RTYPE = thrust::device_ptr<ELEM>;
......@@ -37,17 +43,26 @@ struct DevicePtrCast<T, true> {
};
template <typename T>
struct DevicePtrCast<T, false> {
struct PointerToThrustDevicePtr<T, false> {
using RTYPE = T;
inline RTYPE operator()(RTYPE it) const { return it; }
};
// Cast T to thrust::device_ptr if T is a pointer.
// Otherwise, e.g., T is a iterator, return T itself.
// CastToCUDATransformIterator casts a pointer to thrust::device_ptr
// so it could be used as the iterator of thrust::transform. It
// doesn't cast other types.
//
// We need CastToCUDATransformIterator because it is often that we
// want to use device memory pointers as transform iterators, e.g., to
// transform a block of float32 to float16. In this case, we want
// CastToCUDATransformIterator to cast float16/32 pointers to
// thrust::device_ptr, otherwise they cannot work as the iterator
// required by thrust::transform. At the same time, we don't want to
// cast thrust::device_ptr to thrust::device_ptr repeatedly.
template <typename T>
auto DevPtrCast(T t) ->
typename DevicePtrCast<T, std::is_pointer<T>::value>::RTYPE {
DevicePtrCast<T, std::is_pointer<T>::value> cast;
auto CastToCUDATransformIterator(T t) ->
typename PointerToThrustDevicePtr<T, std::is_pointer<T>::value>::RTYPE {
PointerToThrustDevicePtr<T, std::is_pointer<T>::value> cast;
return cast(t);
}
......
......@@ -175,7 +175,7 @@ CUDADeviceContext::~CUDADeviceContext() {
Place CUDADeviceContext::GetPlace() const { return place_; }
void CUDADeviceContext::Wait() const {
std::lock_guard<std::mutex> guard(mutex_);
std::lock_guard<std::recursive_mutex> guard(mutex_);
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
PADDLE_ENFORCE(cudaGetLastError());
}
......
......@@ -98,13 +98,20 @@ class CUDADeviceContext : public DeviceContext {
/*! \brief Return cuda stream in the device context. */
cudaStream_t stream() const;
template <typename Callback>
void RecordEvent(cudaEvent_t ev, Callback callback) {
std::lock_guard<std::recursive_mutex> guard(mutex_);
callback();
PADDLE_ENFORCE(cudaEventRecord(ev, stream_));
}
private:
CUDAPlace place_;
std::unique_ptr<Eigen::GpuDevice> eigen_device_;
std::unique_ptr<EigenCudaStreamDevice> eigen_stream_;
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
cudaStream_t stream_;
cudnnHandle_t cudnn_handle_;
cublasHandle_t cublas_handle_;
......
cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce)
list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc nccl.cc)
if (TENSORRT_FOUND)
list(APPEND CUDA_SRCS tensorrt.cc)
endif()
configure_file(cupti_lib_path.h.in ${CMAKE_CURRENT_BINARY_DIR}/cupti_lib_path.h)
if (CUPTI_FOUND)
list(APPEND CUDA_SRCS cupti.cc)
......
......@@ -45,6 +45,10 @@ DEFINE_string(nccl_dir, "",
DEFINE_string(cupti_dir, "", "Specify path for loading cupti.so.");
DEFINE_string(
tensorrt_dir, "",
"Specify path for loading tensorrt library, such as libnvinfer.so.");
namespace paddle {
namespace platform {
namespace dynload {
......@@ -194,6 +198,14 @@ void* GetNCCLDsoHandle() {
#endif
}
void* GetTensorRtDsoHandle() {
#if defined(__APPLE__) || defined(__OSX__)
return GetDsoHandleFromSearchPath(FLAGS_tensorrt_dir, "libnvinfer.dylib");
#else
return GetDsoHandleFromSearchPath(FLAGS_tensorrt_dir, "libnvinfer.so");
#endif
}
} // namespace dynload
} // namespace platform
} // namespace paddle
......@@ -25,6 +25,7 @@ void* GetCurandDsoHandle();
void* GetWarpCTCDsoHandle();
void* GetLapackDsoHandle();
void* GetNCCLDsoHandle();
void* GetTensorRtDsoHandle();
} // namespace dynload
} // namespace platform
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/dynload/tensorrt.h"
namespace paddle {
namespace platform {
namespace dynload {
std::once_flag tensorrt_dso_flag;
void *tensorrt_dso_handle;
#define DEFINE_WRAP(__name) DynLoad__##__name __name
TENSORRT_RAND_ROUTINE_EACH(DEFINE_WRAP);
} // namespace dynload
} // namespace platform
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <NvInfer.h>
#include <dlfcn.h>
#include <mutex> // NOLINT
#include "paddle/fluid/platform/dynload/dynamic_loader.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace platform {
namespace dynload {
extern std::once_flag tensorrt_dso_flag;
extern void* tensorrt_dso_handle;
#ifdef PADDLE_USE_DSO
#define DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP(__name) \
struct DynLoad__##__name { \
template <typename... Args> \
auto operator()(Args... args) -> decltype(__name(args...)) { \
using tensorrt_func = decltype(__name(args...)) (*)(Args...); \
std::call_once(tensorrt_dso_flag, []() { \
tensorrt_dso_handle = \
paddle::platform::dynload::GetTensorRtDsoHandle(); \
PADDLE_ENFORCE(tensorrt_dso_handle, "load tensorrt so failed"); \
}); \
void* p_##__name = dlsym(tensorrt_dso_handle, #__name); \
PADDLE_ENFORCE(p_##__name, "load %s failed", #__name); \
return reinterpret_cast<tensorrt_func>(p_##__name)(args...); \
} \
}; \
extern DynLoad__##__name __name
#else
#define DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP(__name) \
struct DynLoad__##__name { \
template <typename... Args> \
tensorrtResult_t operator()(Args... args) { \
return __name(args...); \
} \
}; \
extern DynLoad__##__name __name
#endif
#define TENSORRT_RAND_ROUTINE_EACH(__macro) \
__macro(createInferBuilder_INTERNAL); \
__macro(createInferRuntime_INTERNAL);
TENSORRT_RAND_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_TENSORRT_WRAP)
} // namespace dynload
} // namespace platform
} // namespace paddle
......@@ -873,6 +873,11 @@ HOSTDEVICE inline bool(isfinite)(const float16& a) {
return !((isnan)(a)) && !((isinf)(a));
}
inline std::ostream& operator<<(std::ostream& os, const float16& a) {
os << static_cast<float>(a);
return os;
}
} // namespace platform
} // namespace paddle
......
......@@ -141,5 +141,10 @@ TEST(float16, lod_tensor_cpu) {
}
}
TEST(float16, print) {
float16 a = float16(1.0f);
std::cout << a << std::endl;
}
} // namespace platform
} // namespace paddle
......@@ -14,29 +14,44 @@ limitations under the License. */
#pragma once
#include <algorithm>
#include <type_traits>
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/hostdevice.h"
#include "paddle/fluid/platform/place.h"
#include <algorithm>
#include <type_traits>
#ifdef __NVCC__
#include <thrust/execution_policy.h>
#include <thrust/transform.h>
#include "paddle/fluid/platform/details/device_ptr_cast.h"
#include "paddle/fluid/platform/details/cuda_transform_iterator_cast.h"
#endif
namespace paddle {
namespace platform {
// Transform on host or device. It provides the same API in std library.
// Transform applys a unary or a binary functor on each element in a
// range defined by a pair of iterators.
//
// - The specialization for CPU calls std::transform.
// - The specialization for CUDA calls thrust::tranform.
//
// NOTE: We need to define InputIter and OutputIter defined as
// different types, because the InputIter points op's inputs and
// OutputIter pints to op's outputs.
//
// NOTE: We don't assume that InputIter to be const InputType* and
// OutputIter to be OutputType*, because we might use a iterator
// class, paddle::fluid::operators::RowwiseTRansformIterator.
template <typename DeviceContext>
struct Transform {
// The unary version.
template <typename InputIter, typename OutputIter, typename UnaryOperation>
void operator()(const DeviceContext& context, InputIter first, InputIter last,
OutputIter result, UnaryOperation op);
// The binary version.
template <typename InputIter1, typename InputIter2, typename OutputIter,
typename BinaryOperation>
void operator()(const DeviceContext& context, InputIter1 first1,
......@@ -70,8 +85,9 @@ struct Transform<platform::CUDADeviceContext> {
auto place = context.GetPlace();
PADDLE_ENFORCE(is_gpu_place(place), "It must use GPU place.");
thrust::transform(thrust::cuda::par.on(context.stream()),
details::DevPtrCast(first), details::DevPtrCast(last),
details::DevPtrCast(result), op);
details::CastToCUDATransformIterator(first),
details::CastToCUDATransformIterator(last),
details::CastToCUDATransformIterator(result), op);
}
template <typename InputIter1, typename InputIter2, typename OutputIter,
......@@ -82,9 +98,10 @@ struct Transform<platform::CUDADeviceContext> {
auto place = context.GetPlace();
PADDLE_ENFORCE(is_gpu_place(place), "It must use GPU place.");
thrust::transform(thrust::cuda::par.on(context.stream()),
details::DevPtrCast(first1), details::DevPtrCast(last1),
details::DevPtrCast(first2), details::DevPtrCast(result),
op);
details::CastToCUDATransformIterator(first1),
details::CastToCUDATransformIterator(last1),
details::CastToCUDATransformIterator(first2),
details::CastToCUDATransformIterator(result), op);
}
};
#endif
......
......@@ -18,11 +18,12 @@ limitations under the License. */
#include "paddle/fluid/platform/hostdevice.h"
#include "paddle/fluid/platform/transform.h"
namespace {
template <typename T>
class Scale {
public:
explicit Scale(const T& scale) : scale_(scale) {}
HOSTDEVICE T operator()(const T& a) const { return a * scale_; }
private:
......@@ -35,11 +36,23 @@ class Multiply {
HOSTDEVICE T operator()(const T& a, const T& b) const { return a * b; }
};
} // namespace
using paddle::memory::Alloc;
using paddle::memory::Free;
using paddle::memory::Copy;
using paddle::platform::CPUPlace;
using paddle::platform::CUDAPlace;
using paddle::platform::CPUDeviceContext;
using paddle::platform::CUDADeviceContext;
using paddle::platform::Transform;
TEST(Transform, CPUUnary) {
using namespace paddle::platform;
CPUDeviceContext ctx;
float buf[4] = {0.1, 0.2, 0.3, 0.4};
Transform<paddle::platform::CPUDeviceContext> trans;
Transform<CPUDeviceContext> trans;
trans(ctx, buf, buf + 4, buf, Scale<float>(10));
for (int i = 0; i < 4; ++i) {
ASSERT_NEAR(buf[i], static_cast<float>(i + 1), 1e-5);
......@@ -47,14 +60,12 @@ TEST(Transform, CPUUnary) {
}
TEST(Transform, GPUUnary) {
using namespace paddle::platform;
using namespace paddle::memory;
CUDAPlace gpu0(0);
CUDADeviceContext ctx(gpu0);
float cpu_buf[4] = {0.1, 0.2, 0.3, 0.4};
float* gpu_buf = static_cast<float*>(Alloc(gpu0, sizeof(float) * 4));
Copy(gpu0, gpu_buf, CPUPlace(), cpu_buf, sizeof(cpu_buf), ctx.stream());
Transform<paddle::platform::CUDADeviceContext> trans;
Transform<CUDADeviceContext> trans;
trans(ctx, gpu_buf, gpu_buf + 4, gpu_buf, Scale<float>(10));
ctx.Wait();
Copy(CPUPlace(), cpu_buf, gpu0, gpu_buf, sizeof(cpu_buf), ctx.stream());
......@@ -65,10 +76,8 @@ TEST(Transform, GPUUnary) {
}
TEST(Transform, CPUBinary) {
using namespace paddle::platform;
using namespace paddle::memory;
int buf[4] = {1, 2, 3, 4};
Transform<paddle::platform::CPUDeviceContext> trans;
Transform<CPUDeviceContext> trans;
CPUDeviceContext ctx;
trans(ctx, buf, buf + 4, buf, buf, Multiply<int>());
for (int i = 0; i < 4; ++i) {
......@@ -77,14 +86,12 @@ TEST(Transform, CPUBinary) {
}
TEST(Transform, GPUBinary) {
using namespace paddle::platform;
using namespace paddle::memory;
int buf[4] = {1, 2, 3, 4};
CUDAPlace gpu0(0);
CUDADeviceContext ctx(gpu0);
int* gpu_buf = static_cast<int*>(Alloc(gpu0, sizeof(buf)));
Copy(gpu0, gpu_buf, CPUPlace(), buf, sizeof(buf), ctx.stream());
Transform<paddle::platform::CUDADeviceContext> trans;
Transform<CUDADeviceContext> trans;
trans(ctx, gpu_buf, gpu_buf + 4, gpu_buf, gpu_buf, Multiply<int>());
ctx.Wait();
Copy(CPUPlace(), buf, gpu0, gpu_buf, sizeof(buf), ctx.stream());
......
......@@ -14,29 +14,25 @@ limitations under the License. */
#pragma once
#ifdef __CUDACC__
#ifdef __CUDACC_VER_MAJOR__
// CUDA 9 define `__CUDACC_VER__` as a warning message, manually define
// __CUDACC_VER__ instead.
// Boost 1.41.0 requires __CUDACC_VER__, but in CUDA 9 __CUDACC_VER__
// is removed, so we have to manually define __CUDACC_VER__ instead.
// For details, please refer to
// https://github.com/PaddlePaddle/Paddle/issues/6626
#if defined(__CUDACC__) && defined(__CUDACC_VER_MAJOR__)
#undef __CUDACC_VER__
#define __CUDACC_VER__ \
(__CUDACC_VER_MAJOR__ * 10000 + __CUDACC_VER_MINOR__ * 100 + \
__CUDACC_VER_BUILD__)
#endif
#define __CUDACC_VER__ \
__CUDACC_VER_BUILD__ + __CUDACC_VER_MAJOR__ * 10000 + \
__CUDACC_VER_MINOR__ * 100
#endif
#include <boost/config.hpp>
#include "boost/config.hpp"
#ifdef PADDLE_WITH_CUDA
// Because boost's variadic templates has bug on nvcc, boost will disable
// variadic template support when GPU enabled on nvcc.
// Define BOOST_NO_CXX11_VARIADIC_TEMPLATES on gcc/clang to generate same
// function symbols.
//
// Because Boost 1.41.0's variadic templates has bug on nvcc, boost
// will disable variadic template support in NVCC mode. Define
// BOOST_NO_CXX11_VARIADIC_TEMPLATES on gcc/clang to generate same
// function symbols. For details,
// https://github.com/PaddlePaddle/Paddle/issues/3386
#ifdef PADDLE_WITH_CUDA
#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
#define BOOST_NO_CXX11_VARIADIC_TEMPLATES
#endif
......
......@@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle.
scope, local_scopes, allow_op_delay);
})
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
// one by one and mark them as reference.
.def("local_scopes",
[](ParallelExecutor &self) -> std::vector<Scope *> * {
return &self.GetLocalScopes();
},
py::return_value_policy::reference)
.def("feed_tensors_into_local_scopes",
&ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run);
BindRecordIOWriter(&m);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册