提交 4e07f259 编写于 作者: Q Qiao Longfei 提交者: sneaxiy

Merge pull request #12295 from jacquesqiao/speedup-reduce-sum-grad-op

Speedup reduce sum grad op
...@@ -23,12 +23,13 @@ REGISTER_OP_CPU_KERNEL( ...@@ -23,12 +23,13 @@ REGISTER_OP_CPU_KERNEL(
ops::ReduceKernel<paddle::platform::CPUDeviceContext, int, ops::SumFunctor>, ops::ReduceKernel<paddle::platform::CPUDeviceContext, int, ops::SumFunctor>,
ops::ReduceKernel<paddle::platform::CPUDeviceContext, int64_t, ops::ReduceKernel<paddle::platform::CPUDeviceContext, int64_t,
ops::SumFunctor>); ops::SumFunctor>);
REGISTER_OP_CPU_KERNEL(reduce_sum_grad, REGISTER_OP_CPU_KERNEL(
ops::ReduceGradKernel<paddle::platform::CPUDeviceContext, reduce_sum_grad,
float, ops::SumGradFunctor>, ops::ReduceSumGradKernel<paddle::platform::CPUDeviceContext, float,
ops::ReduceGradKernel<paddle::platform::CPUDeviceContext, ops::SumGradFunctor>,
double, ops::SumGradFunctor>, ops::ReduceSumGradKernel<paddle::platform::CPUDeviceContext, double,
ops::ReduceGradKernel<paddle::platform::CPUDeviceContext, ops::SumGradFunctor>,
int, ops::SumGradFunctor>, ops::ReduceSumGradKernel<paddle::platform::CPUDeviceContext, int,
ops::ReduceGradKernel<paddle::platform::CPUDeviceContext, ops::SumGradFunctor>,
int64_t, ops::SumGradFunctor>); ops::ReduceSumGradKernel<paddle::platform::CPUDeviceContext, int64_t,
ops::SumGradFunctor>);
...@@ -14,11 +14,69 @@ ...@@ -14,11 +14,69 @@
#pragma once #pragma once
#include <vector>
#include "paddle/fluid/operators/reduce_op.h" #include "paddle/fluid/operators/reduce_op.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
// use for loop to speed up Eigen broadcast. 4 timer faster then broadcast
template <typename DeviceContext, typename T, typename Functor>
class ReduceSumGradKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
auto dims = context.Attr<std::vector<int>>("dim");
if (context.GetPlace().type() == typeid(platform::CPUPlace) &&
dims.size() == 1) {
auto* input0 = context.Input<Tensor>("X");
auto* input2 = context.Input<Tensor>(framework::GradVarName("Out"));
auto* output = context.Output<Tensor>(framework::GradVarName("X"));
output->mutable_data<T>(context.GetPlace());
const auto* input2_d = input2->data<T>();
auto* output_d = output->data<T>();
// handle reduce_all
if (input2->dims().size() == 1 && input2->dims()[0] == 1) {
for (int64_t i = 0; i < framework::product(input0->dims()); ++i) {
output_d[i] = input2_d[0];
}
return;
}
// handle reduce by one dimension
int reduce_dim_index = dims[0];
if (reduce_dim_index < 0) {
reduce_dim_index += input0->dims().size();
}
auto& input_dim = input0->dims();
int64_t before_dim = 1;
for (int i = 0; i < reduce_dim_index; ++i) {
before_dim *= input_dim[i];
}
int64_t reduce_dim = input_dim[reduce_dim_index];
int64_t after_dim = 1;
for (int i = reduce_dim_index + 1; i < input_dim.size(); ++i) {
after_dim *= input_dim[i];
}
for (int64_t i = 0; i < before_dim; ++i) {
for (int64_t j = 0; j < reduce_dim; ++j) {
for (int64_t k = 0; k < after_dim; ++k) {
output_d[i * reduce_dim * after_dim + j * after_dim + k] =
input2_d[i * after_dim + k];
}
}
}
return;
}
// default use Eigen broadcast
ReduceGradKernel<DeviceContext, T, Functor> kernel;
kernel.Compute(context);
}
};
struct SumFunctor { struct SumFunctor {
template <typename DeviceContext, typename X, typename Y, typename Dim> template <typename DeviceContext, typename X, typename Y, typename Dim>
void operator()(const DeviceContext& place, X* x, Y* y, const Dim& dim) { void operator()(const DeviceContext& place, X* x, Y* y, const Dim& dim) {
...@@ -31,7 +89,7 @@ struct SumGradFunctor { ...@@ -31,7 +89,7 @@ struct SumGradFunctor {
typename DY, typename Dim> typename DY, typename Dim>
void operator()(const DeviceContext& place, X* x, Y* y, DX* dx, DY* dy, void operator()(const DeviceContext& place, X* x, Y* y, DX* dx, DY* dy,
const Dim& dim, int size) { const Dim& dim, int size) {
dx->device(place) = dy->broadcast(dim); dx->device(place) = dy->eval().broadcast(dim);
} }
}; };
......
...@@ -456,52 +456,122 @@ def py_reader(capacity, ...@@ -456,52 +456,122 @@ def py_reader(capacity,
name=None, name=None,
use_double_buffer=True): use_double_buffer=True):
""" """
Create a reader and blocking queue for data feeding in Python Create a python reader for data feeding in Python
This layer returns a Reader Variable and a BlockingQueue. This layer returns a Reader Variable.
The BlockingQueue provides `push()` method to push a `LoDTensorArray` The Reader provides :code:`decorate_paddle_reader` and
object into the queue in Python side. In C++ side, the Reader :code:`decorate_tensor_provider` to set a Python generator as the data
Variable would invoke `pop()` method of the queue to retrieve the source in Python side. When :code:`Executor::Run()` is invoked in C++
feeding data. The process of feeding data in Python side and fetching side, the data from the generator would be read automatically. Unlike
data in C++ side can run in parallel. The BlockingQueue should be closed :code:`DataFeeder.feed()`, the data reading process and
using `close()` method when unused. :code:`Executor::Run()` process can run in parallel using
:code:`py_reader`. The :code:`start()` method of the Reader should be
called when each pass begins, while the :code:`reset()` method should be
called when the pass ends and :code:`fluid.core.EOFException` raises.
Note that :code:`Program.clone()` method cannot clone :code:`py_reader`.
Args: Args:
use_double_buffer(bool): Whether use double buffer or not. capacity(int): The buffer capacity maintained by :code:`py_reader`.
capacity(int): The maximum capacity of the BlockingQueue.
shapes(list|tuple): List of tuples which declaring data shapes. shapes(list|tuple): List of tuples which declaring data shapes.
dtypes(list|tuple): List of strs which declaring data type. dtypes(list|tuple): List of strs which declaring data type.
lod_levels(list|tuple): List of ints which declaring data lod_level. lod_levels(list|tuple): List of ints which declaring data lod_level.
name(basestring): The prefix Python queue name and Reader name. None will name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically. be generated automatically.
use_double_buffer(bool): Whether use double buffer or not.
Returns: Returns:
tuple(Variable, BlockingQueue): Variable: A Reader from which we can get feeding data.
A Reader Variable from which we can get feeding data.
A BlockingQueue object for data feeding.
Examples: Examples:
.. code-block:: python 1. The basic usage of :code:`py_reader` is as follows:
reader, queue = fluid.layers.py_reader( >>> import paddle.v2
capacity=10, >>> import paddle.fluid as fluid
shapes=[[-1,3,224,224], [-1,1]], >>> import paddle.dataset.mnist as mnist
dtypes=['float32', 'int64']) >>>
# Via the reader, we can use 'read_file' layer to get data: >>> reader = fluid.layers.py_reader(capacity=64,
image, label = fluid.layers.read_file(reader) >>> shapes=[(-1,3,224,224), (-1,1)],
>>> dtypes=['float32', 'int64'])
# Via the blocking queue, we can feed data using threads >>> reader.decorate_paddle_reader(
def feed_data(queue, feed_images, feed_labels): >>> paddle.v2.reader.shuffle(paddle.batch(mnist.train())
for feed_image, feed_label in zip(feed_images, feed_labels): >>>
data = core.LoDTensorArray() >>> img, label = fluid.layers.read_file(reader)
data.append(feed_image) >>> loss = network(img, label) # some network definition
data.append(feed_label) >>>
queue.push(data) >>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
>>>
thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels)) >>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
thread.start() >>> for epoch_id in range(10):
>>> reader.start()
>>> try:
>>> while True:
>>> exe.run(fetch_list=[loss.name])
>>> except fluid.core.EOFException:
>>> reader.reset()
2. When training and testing are both performed, two different
:code:`py_reader` should be created with different names, e.g.:
>>> import paddle.v2
>>> import paddle.fluid as fluid
>>> import paddle.dataset.mnist as mnist
>>>
>>> def network(reader):
>>> img, label = fluid.layers.read_file(reader)
>>> # Here, we omitted the network definition
>>> return loss
>>>
>>> train_reader = fluid.layers.py_reader(capacity=64,
>>> shapes=[(-1,3,224,224), (-1,1)],
>>> dtypes=['float32', 'int64'],
>>> name='train_reader')
>>> train_reader.decorate_paddle_reader(
>>> paddle.v2.reader.shuffle(paddle.batch(mnist.train())
>>>
>>> test_reader = fluid.layers.py_reader(capacity=32,
>>> shapes=[(-1,3,224,224), (-1,1)],
>>> dtypes=['float32', 'int64'],
>>> name='test_reader')
>>> test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
>>>
>>> # Create train_main_prog and train_startup_prog
>>> train_main_prog = fluid.Program()
>>> train_startup_prog = fluid.Program()
>>> with fluid.program_guard(train_main_prog, train_startup_prog):
>>> # Use fluid.unique_name.guard() to share parameters with test program
>>> with fluid.unique_name.guard():
>>> train_loss = network(train_reader) # some network definition
>>> adam = fluid.optimizer.Adam(learning_rate=0.01)
>>> adam.minimize(loss)
>>>
>>> # Create test_main_prog and test_startup_prog
>>> test_main_prog = fluid.Program()
>>> test_startup_prog = fluid.Program()
>>> with fluid.program_guard(test_main_prog, test_startup_prog):
>>> # Use fluid.unique_name.guard() to share parameters with train program
>>> with fluid.unique_name.guard():
>>> test_loss = network(test_reader)
>>>
>>> fluid.Executor(fluid.CUDAPlace(0)).run(train_startup_prog)
>>> fluid.Executor(fluid.CUDAPlace(0)).run(test_startup_prog)
>>>
>>> train_exe = fluid.ParallelExecutor(use_cuda=True,
>>> loss_name=train_loss.name, main_program=train_main_prog)
>>> test_exe = fluid.ParallelExecutor(use_cuda=True,
>>> loss_name=test_loss.name, main_program=test_main_prog)
>>> for epoch_id in range(10):
>>> try:
>>> while True:
>>> train_exe.run(fetch_list=[train_loss.name])
>>> except fluid.core.EOFException:
>>> train_reader.reset()
>>>
>>> try:
>>> while True:
>>> test_exe.run(fetch_list=[test_loss.name])
>>> except fluid.core.EOFException:
>>> test_reader.reset()
""" """
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = [] shape_concat = []
......
...@@ -2961,7 +2961,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): ...@@ -2961,7 +2961,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None):
# x is a Tensor variable with following elements: # x is a Tensor variable with following elements:
# [[0.2, 0.3, 0.5, 0.9] # [[0.2, 0.3, 0.5, 0.9]
# [0.1, 0.2, 0.6, 0.7]] # [0.1, 0.2, 0.6, 0.7]]
# Each example is followed by the correspending output tensor. # Each example is followed by the corresponding output tensor.
fluid.layers.reduce_sum(x) # [3.5] fluid.layers.reduce_sum(x) # [3.5]
fluid.layers.reduce_sum(x, dim=0) # [0.3, 0.5, 1.1, 1.6] fluid.layers.reduce_sum(x, dim=0) # [0.3, 0.5, 1.1, 1.6]
fluid.layers.reduce_sum(x, dim=-1) # [1.9, 1.6] fluid.layers.reduce_sum(x, dim=-1) # [1.9, 1.6]
...@@ -2970,7 +2970,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): ...@@ -2970,7 +2970,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None):
# x is a Tensor variable with shape [2, 2, 2] and elements as below: # x is a Tensor variable with shape [2, 2, 2] and elements as below:
# [[[1, 2], [3, 4]], # [[[1, 2], [3, 4]],
# [[5, 6], [7, 8]]] # [[5, 6], [7, 8]]]
# Each example is followed by the correspending output tensor. # Each example is followed by the corresponding output tensor.
fluid.layers.reduce_sum(x, dim=[1, 2]) # [10, 26] fluid.layers.reduce_sum(x, dim=[1, 2]) # [10, 26]
fluid.layers.reduce_sum(x, dim=[0, 1]) # [16, 20] fluid.layers.reduce_sum(x, dim=[0, 1]) # [16, 20]
......
...@@ -89,15 +89,11 @@ class TestProdOp(OpTest): ...@@ -89,15 +89,11 @@ class TestProdOp(OpTest):
self.check_grad(['X'], 'Out') self.check_grad(['X'], 'Out')
class TestKeepDimReduce(OpTest): class Test1DReduce(OpTest):
def setUp(self): def setUp(self):
self.op_type = "reduce_sum" self.op_type = "reduce_sum"
self.inputs = {'X': np.random.random((5, 6, 10)).astype("float64")} self.inputs = {'X': np.random.random(20).astype("float64")}
self.attrs = {'dim': [-2], 'keep_dim': True} self.outputs = {'Out': self.inputs['X'].sum(axis=0)}
self.outputs = {
'Out':
self.inputs['X'].sum(axis=tuple(self.attrs['dim']), keepdims=True)
}
def test_check_output(self): def test_check_output(self):
self.check_output() self.check_output()
...@@ -106,32 +102,82 @@ class TestKeepDimReduce(OpTest): ...@@ -106,32 +102,82 @@ class TestKeepDimReduce(OpTest):
self.check_grad(['X'], 'Out') self.check_grad(['X'], 'Out')
class Test1DReduce(OpTest): class Test2DReduce0(Test1DReduce):
def setUp(self): def setUp(self):
self.op_type = "reduce_sum" self.op_type = "reduce_sum"
self.inputs = {'X': np.random.random(20).astype("float64")} self.attrs = {'dim': [0]}
self.inputs = {'X': np.random.random((20, 10)).astype("float64")}
self.outputs = {'Out': self.inputs['X'].sum(axis=0)} self.outputs = {'Out': self.inputs['X'].sum(axis=0)}
def test_check_output(self):
self.check_output()
def test_check_grad(self): class Test2DReduce1(Test1DReduce):
self.check_grad(['X'], 'Out') def setUp(self):
self.op_type = "reduce_sum"
self.attrs = {'dim': [1]}
self.inputs = {'X': np.random.random((20, 10)).astype("float64")}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']))
}
class TestReduceAll(OpTest): class Test3DReduce0(Test1DReduce):
def setUp(self):
self.op_type = "reduce_sum"
self.attrs = {'dim': [1]}
self.inputs = {'X': np.random.random((5, 6, 7)).astype("float64")}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']))
}
class Test3DReduce1(Test1DReduce):
def setUp(self):
self.op_type = "reduce_sum"
self.attrs = {'dim': [2]}
self.inputs = {'X': np.random.random((5, 6, 7)).astype("float64")}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']))
}
class Test3DReduce2(Test1DReduce):
def setUp(self):
self.op_type = "reduce_sum"
self.attrs = {'dim': [-2]}
self.inputs = {'X': np.random.random((5, 6, 7)).astype("float64")}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']))
}
class Test3DReduce3(Test1DReduce):
def setUp(self):
self.op_type = "reduce_sum"
self.attrs = {'dim': [1, 2]}
self.inputs = {'X': np.random.random((5, 6, 7)).astype("float64")}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']))
}
class TestKeepDimReduce(Test1DReduce):
def setUp(self):
self.op_type = "reduce_sum"
self.inputs = {'X': np.random.random((5, 6, 10)).astype("float64")}
self.attrs = {'dim': [1], 'keep_dim': True}
self.outputs = {
'Out': self.inputs['X'].sum(axis=tuple(self.attrs['dim']),
keepdims=self.attrs['keep_dim'])
}
class TestReduceAll(Test1DReduce):
def setUp(self): def setUp(self):
self.op_type = "reduce_sum" self.op_type = "reduce_sum"
self.inputs = {'X': np.random.random((5, 6, 2, 10)).astype("float64")} self.inputs = {'X': np.random.random((5, 6, 2, 10)).astype("float64")}
self.attrs = {'reduce_all': True} self.attrs = {'reduce_all': True}
self.outputs = {'Out': self.inputs['X'].sum()} self.outputs = {'Out': self.inputs['X'].sum()}
def test_check_output(self):
self.check_output()
def test_check_grad(self):
self.check_grad(['X'], 'Out')
## reduction in multi dims ## reduction in multi dims
class TestReduceMeanOpMultiAxises(OpTest): class TestReduceMeanOpMultiAxises(OpTest):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册