提交 f031555c 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into add-merge-splited-ids

...@@ -118,6 +118,10 @@ endif() ...@@ -118,6 +118,10 @@ endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SIMD_FLAG}") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SIMD_FLAG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SIMD_FLAG}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SIMD_FLAG}")
if(WITH_DISTRIBUTE)
add_definitions(-DPADDLE_WITH_DISTRIBUTE)
endif()
if(WITH_GOLANG) if(WITH_GOLANG)
# we need to symlink Paddle directory into GOPATH. If we # we need to symlink Paddle directory into GOPATH. If we
# don't do it and we have code that depends on Paddle, go # don't do it and we have code that depends on Paddle, go
......
# Automatic Differentiation with the Tape
## Automatic Differentiation
A key challenge in the field of deep learning is to automatically derive the backward pass from the forward pass described algorithmically by researchers. Such a derivation, or a transformation of the forward pass program, has been long studied before the recent prosperity of deep learning in the field known as [automatic differentiation](https://arxiv.org/pdf/1502.05767.pdf).
## The Tape
Given the forward pass program (usually in Python in practices), there are two strategies to derive the backward pass:
1. from the forward pass program itself, or
1. from the execution trace of the forward pass program, which is often known as the *tape*.
This article surveys systems that follow the latter strategy.
## Dynamic Network
When we train a deep learning model, the tape changes every iteration as the input data change, so we have to re-derive the backward pass every iteration. This is known as *dynamic network*.
Deep learning systems that utilize the idea of dynamic network gained their popularities in recent years. This article surveys two representative systems: [PyTorch](https://pytorch.org/) and [DyNet](https://dynet.readthedocs.io/en/latest/).
## An Overview
Both frameworks record a ‘tape’ of the computation and interpreting (or run-time compiling) a transformation of the tape played back in reverse. This tape is a different kind of entity than the original program.[[link]](http://www.bcl.hamilton.ie/~barak/papers/toplas-reverse.pdf)
Consider the following code feedforward model.
```python
x = Variable(randn(20, 1)))
label = Variable(randint(1))
W_1, W_2 = Variable(randn(20, 20)), Variable(randn(10, 20))
h = matmul(W_1, x)
pred = matmul(W_2, x)
loss = softmax(pred, label)
loss.backward()
```
### 1) Dynet uses List to encode the Tape
During the forward execution, a list of operators, in this case `matmul`, `matmul` and `softmax`, are recorded in the tape, along with the necessary information needed to do the backward such as pointers to the inputs and outputs. Then the tape is played in reverse order at `loss.backward()`.
<details>
<summary></summary>
digraph g {
graph [
rankdir = "LR"
];
node [
fontsize = "16"
shape = "ellipse"
];
edge [];
"node0" [
label = "<f0> type: matmul | <f1> input: W_1, x | <f2> output: h"
shape = "record"
];
"node1" [
label = "<f0> type: matmul | <f1> input: W_2, h | <f2> output: pred"
shape = "record"
];
"node2" [
label = "<f0> type: softmax | <f1> input: pred, label | <f2> output: loss"
shape = "record"
];
"node0":f0 -> "node1":f0 [];
"node1":f0 -> "node2":f0 [];
}
</details>
![Alt text](https://g.gravizo.com/svg?digraph%20g%20{%20graph%20[%20rankdir%20=%20%22LR%22%20];%20node%20[%20fontsize%20=%20%2216%22%20shape%20=%20%22ellipse%22%20];%20edge%20[];%20%22node0%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20matmul%20|%20%3Cf1%3E%20input:%20W_1,%20x%20|%20%3Cf2%3E%20output:%20h%22%20shape%20=%20%22record%22%20];%20%22node1%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20matmul%20|%20%3Cf1%3E%20input:%20W_2,%20h%20|%20%3Cf2%3E%20output:%20pred%22%20shape%20=%20%22record%22%20];%20%22node2%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20softmax%20|%20%3Cf1%3E%20input:%20pred,%20label%20|%20%3Cf2%3E%20output:%20loss%22%20shape%20=%20%22record%22%20];%20%22node0%22:f0%20-%3E%20%22node1%22:f0%20[%20id%20=%200%20];%20%22node1%22:f0%20-%3E%20%22node2%22:f0%20[%20id%20=%201%20];%20})
### 2) Pytorch uses Node Graph to encode the Tape
The graph is composed of `Variable`s and `Function`s. During the forward execution, a `Variable` records its creator function, e.g. `h.creator = matmul`. And a Function records its inputs' previous/dependent functions `prev_func` through `creator`, e.g. `matmul.prev_func = matmul1`. At `loss.backward()`, a topological sort is performed on all `prev_func`s. Then the grad op is performed by the sorted order.
<details>
<summary></summary>
digraph g {
graph [
rankdir = "LR"
];
subgraph function {
node [
fontsize = "16"
style = filled
shape = "record"
];
"matmul0" [ label = "<f0> type: matmul | prev_func: None" ];
"matmul1" [ label = "<f0> type: matmul | prev_func: matmul" ];
"softmax" [ label = "<f0> type: softmax | prev_func: matmul" ];
}
subgraph variable {
node [
fontsize = "16"
shape = "Mrecord"
style = filled
fillcolor = white
];
"x" [ label = "<f0> x | <f1> creator: None" ];
"label" [ label = "<f0> label | <f1> creator: None" ];
"W_1" [ label = "<f0> W_1 | <f1> creator: None" ];
"W_2" [ label = "<f0> W_2 | <f1> creator: None" ];
"h" [ label = "<f0> h | <f1> creator: None" ];
"pred" [ label = "<f0> pred | <f1> creator: matmul" ];
"loss" [ label = "<f0> loss | <f1> creator: softmax" ];
}
subgraph data_flow {
"x":f0 -> "matmul0":f0;
"W_1":f0 -> "matmul0":f0;
"matmul0":f0 -> "h":f0;
"h":f0 -> "matmul1":f0;
"W_2":f0 -> "matmul1":f0;
"matmul1":f0 -> "pred":f0;
"pred":f0 -> "softmax":f0;
"label":f0 -> "softmax":f0;
"softmax":f0 -> "loss":f0;
}
subgraph prev_func {
edge [color="red", arrowsize="0.6", penwidth="1", constraint=false];
"matmul1":f1 -> "matmul0":f0;
"softmax":f1 -> "matmul1":f0;
label = "prev_func";
}
}
</details>
![Alt text](https://g.gravizo.com/svg?digraph%20g%20{%20graph%20[%20rankdir%20=%20%22LR%22%20];%20subgraph%20function%20{%20node%20[%20fontsize%20=%20%2216%22%20style%20=%20filled%20shape%20=%20%22record%22%20];%20%22matmul0%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20matmul%20|%20prev_func:%20None%22%20];%20%22matmul1%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20matmul%20|%20prev_func:%20matmul%22%20];%20%22softmax%22%20[%20label%20=%20%22%3Cf0%3E%20type:%20softmax%20|%20prev_func:%20matmul%22%20];%20}%20subgraph%20variable%20{%20node%20[%20fontsize%20=%20%2216%22%20shape%20=%20%22Mrecord%22%20style%20=%20filled%20fillcolor%20=%20white%20];%20%22x%22%20[%20label%20=%20%22%3Cf0%3E%20x%20|%20%3Cf1%3E%20creator:%20None%22%20];%20%22label%22%20[%20label%20=%20%22%3Cf0%3E%20label%20|%20%3Cf1%3E%20creator:%20None%22%20];%20%22W_1%22%20[%20label%20=%20%22%3Cf0%3E%20W_1%20|%20%3Cf1%3E%20creator:%20None%22%20];%20%22W_2%22%20[%20label%20=%20%22%3Cf0%3E%20W_2%20|%20%3Cf1%3E%20creator:%20None%22%20];%20%22h%22%20[%20label%20=%20%22%3Cf0%3E%20h%20|%20%3Cf1%3E%20creator:%20None%22%20];%20%22pred%22%20[%20label%20=%20%22%3Cf0%3E%20pred%20|%20%3Cf1%3E%20creator:%20matmul%22%20];%20%22loss%22%20[%20label%20=%20%22%3Cf0%3E%20loss%20|%20%3Cf1%3E%20creator:%20softmax%22%20];%20}%20subgraph%20data_flow%20{%20%22x%22:f0%20-%3E%20%22matmul0%22:f0;%20%22W_1%22:f0%20-%3E%20%22matmul0%22:f0;%20%22matmul0%22:f0%20-%3E%20%22h%22:f0;%20%22h%22:f0%20-%3E%20%22matmul1%22:f0;%20%22W_2%22:f0%20-%3E%20%22matmul1%22:f0;%20%22matmul1%22:f0%20-%3E%20%22pred%22:f0;%20%22pred%22:f0%20-%3E%20%22softmax%22:f0;%20%22label%22:f0%20-%3E%20%22softmax%22:f0;%20%22softmax%22:f0%20-%3E%20%22loss%22:f0;%20}%20subgraph%20prev_func%20{%20edge%20[color=%22red%22,%20arrowsize=%220.6%22,%20penwidth=%221%22,%20constraint=false];%20%22matmul1%22:f1%20-%3E%20%22matmul0%22:f0;%20%22softmax%22:f1%20-%3E%20%22matmul1%22:f0;%20label%20=%20%22prev_func%22;%20}%20})
Chainer and Autograd uses the similar techniques to record the forward pass. For details please refer to the appendix.
## Design choices
### 1) Dynet's List vs Pytorch's Node Graph
What's good about List:
1. It avoids a topological sort. One only needs to traverse the list of operators in reverse and calling the corresponding backward operator.
1. It promises effient data parallelism implementations. One could count the time of usage of a certain variable during the construction list. Then in the play back, one knows the calculation of a variable has completed. This enables communication and computation overlapping.
What's good about Node Graph:
1. More flexibility. PyTorch users can mix and match independent graphs however they like, in whatever threads they like (without explicit synchronization). An added benefit of structuring graphs this way is that when a portion of the graph becomes dead, it is automatically freed. [[2]](https://openreview.net/pdf?id=BJJsrmfCZ) Consider the following example, Pytorch only does backward on SmallNet while Dynet does both BigNet and SmallNet.
```python
result = BigNet(data)
loss = SmallNet(data)
loss.backward()
```
### 2) Dynet's Lazy evaluation vs Pytorch's Immediate evaluation
Dynet builds the list in a symbolic matter. Consider the following example
```python
for epoch in range(num_epochs):
for in_words, out_label in training_data:
dy.renew_cg()
W = dy.parameter(W_p)
b = dy.parameter(b_p)
score_sym = dy.softmax(W*dy.concatenate([E[in_words[0]],E[in_words[1]]])+b)
loss_sym = dy.pickneglogsoftmax(score_sym, out_label)
loss_val = loss_sym.value()
loss_sym.backward()
```
The computation of `lookup`, `concat`, `matmul` and `softmax` didn't happen until the call of `loss_sym.value()`. This defered execution is useful because it allows some graph-like optimization possible, e.g. kernel fusion.
Pytorch chooses immediate evaluation. It avoids ever materializing a "forward graph"/"tape" (no need to explicitly call `dy.renew_cg()` to reset the list), recording only what is necessary to differentiate the computation, i.e. `creator` and `prev_func`.
## What can fluid learn from them?
TBD
# Appendix
### Overview
| Framework | Has Tape | Core in C++ | First Release Date |
|-----------|----------|-------------|--------------------|
| Autograd | No | No | Mar 5, 2015 |
| Chainer | No | No | Jun 5, 2015 |
| Pytorch | No | Yes | Aug 31, 2016 |
| Dynet | Yes | Yes | Oct 12, 2016 |
### Source Code
#### Autograd
[Backward code](https://github.com/HIPS/autograd/blob/442205dfefe407beffb33550846434baa90c4de7/autograd/core.py#L8-L40). In the forward pass, a graph of VJPNode is constructed.
```python
# User API
def make_grad(fun, x):
start_node = VJPNode.new_root()
end_value, end_node = trace(start_node, fun, x)
return backward_pass(g, end_node), end_value
# trace the forward pass by creating VJPNodes
def trace(start_node, fun, x):
with trace_stack.new_trace() as t:
start_box = new_box(x, t, start_node)
end_box = fun(start_box)
return end_box._value, end_box._node
def backward_pass(g, end_node):
outgrads = {end_node : (g, False)}
for node in toposort(end_node):
outgrad = outgrads.pop(node)
ingrads = node.vjp(outgrad[0])
for parent, ingrad in zip(node.parents, ingrads):
outgrads[parent] = add_outgrads(outgrads.get(parent), ingrad)
return outgrad[0]
# Every VJPNode corresponds to a op_grad
class VJPNode(Node):
__slots__ = ['parents', 'vjp']
def __init__(self, value, fun, args, kwargs, parent_argnums, parents):
self.parents = parents
vjpmaker = primitive_vjps[fun]
self.vjp = vjpmaker(parent_argnums, value, args, kwargs)
```
#### Chainer
Example Code
```python
# (1) Function Set definition, creates FunctionNode
model = FunctionSet(
l1=F.Linear(784, 100),
l2=F.Linear(100, 100),
l3=F.Linear(100, 10)).to_gpu()
# (2) Optimizer Setup
opt = optimizers.SGD()
opt.setup(model)
# (3) Forward computation
def forward(x, t):
h1 = F.relu(model.l1(x))
h2 = F.relu(model.l2(h1))
y = model.l3(h2)
return F.softmax_cross_entropy(y, t)
# (4) Training loop
for epoch in xrange(n_epoch):
for i in xrange(0, N, b_size):
x = Variable(to_gpu(...))
t = Variable(to_gpu(...))
opt.zero_grads()
loss = forward(x, t)
loss.backward()
opt.update()
```
In `forward(x, t)`, a graph of [`VariableNode`](https://github.com/chainer/chainer/blob/master/chainer/variable.py#L110) and [`FunctionNode`](https://github.com/chainer/chainer/blob/a69103a4aa59d5b318f39b01dbcb858d465b89cf/chainer/function_node.py#L19) is constructed. Every output's `VariableNode.creator` is pointed to the `FunctionNode`.
```python
class FunctionNode(object):
...
def apply(self, inputs):
outputs = self.forward(inputs)
ret = tuple([variable.Variable(y, requires_grad=requires_grad)
for y in outputs])
# Topological ordering
self.rank = max([x.rank for x in inputs]) if input_vars else 0
# Add backward edges
for y in ret:
y.creator_node = self
self.inputs = tuple([x.node for x in input_vars])
self.outputs = tuple([y.node for y in ret])
return ret
```
`loss.backward()` will calculate the accumulated gradient of all variables. All the backward of `FunctionNode`s will be called based on the topological order.
```python
class VariableNode(object):
...
def backward(self, retain_grad, loss_scale):
if self.creator_node is None:
return
cand_funcs = []
seen_set = set()
grads = {}
# Initialize error by 1, if this is a loss variable
if self.data.size == 1 and self._grad_var is None:
self.grad = numpy.ones_like(self.data)
grads[self._node] = self._grad_var
def add_cand(cand):
if cand not in seen_set:
# Negate since heapq is min-heap. This is a global variable
heapq.heappush(cand_funcs, (-cand.rank, len(seen_set), cand))
seen_set.add(cand)
add_cand(self.creator_node)
while cand_funcs:
_, _, func = heapq.heappop(cand_funcs)
gxs = func.backward_accumulate(func.inputs, func.outputs, func.outputs.grad)
for x, gx in enumerate(gxs):
if x in grads:
grads[x] += gx
else:
grads[x] = gx
if x.creator_node is not None:
add_cand(x.creator_node)
```
#### PyTorch
Example Code
```python
x = Variable(torch.ones(5, 5))
y = Variable(torch.ones(5, 5) * 4)
z = x ** 2 + x * 2 + x * y + y
z.backward(torch.ones(5, 5))
```
The trace is done by `Variable.creator` and `Function.previous_functions`.
```python
class Variable(object):
def __init__(self, tensor, creator=None, requires_grad=True):
if creator is None:
creator = Leaf(self, requires_grad)
self.data = tensor
self.creator = creator
self._grad = None
def backward(self, gradient=None):
if gradient is None:
if self.data.numel() != 1:
raise RuntimeError('backward should be called only on a scalar (i.e. 1-element tensor) or with gradient w.r.t. the variable')
gradient = self.data.new(1).fill_(1)
self._execution_engine.run_backward(self, gradient)
class Function(obejct):
# ...
def _do_forward(self, *input):
unpacked_input = tuple(arg.data for arg in input)
raw_output = self.forward(*unpacked_input)
# mark output.creator = self for backward trace
output = tuple(Variable(tensor, self) for tensor in raw_output)
self.previous_functions = [(arg.creator, id(arg)) for arg in input]
self.output_ids = {id(var): i for i, var in enumerate(output)}
return output
def _do_backward(self, grad_output):
return self.backwaerd(grad_output)
```
The [backward](https://github.com/pytorch/pytorch/blob/v0.1.1/torch/autograd/engine.py) is similar to Autograd.
#### DyNet
Example code
```python
model = dy.model()
W_p = model.add_parameters((20, 100))
b_p = model.add_parameters(20)
E = model.add_lookup_parameters((20000, 50))
for epoch in range(num_epochs):
for in_words, out_label in training_data:
dy.renew_cg() # init tape
W = dy.parameter(W_p)
b = dy.parameter(b_p)
score_sym = dy.softmax(W*dy.concatenate([E[in_words[0]],E[in_words[1]]])+b)
loss_sym = dy.pickneglogsoftmax(score_sym, out_label)
loss_val = loss_sym.value()
loss_sym.backward()
```
[forward](https://github.com/clab/dynet/blob/740a9626a13a2732544de142e256ad0d0a166658/dynet/exec.cc#L84-L158), [backward](https://github.com/clab/dynet/blob/740a9626a13a2732544de142e256ad0d0a166658/dynet/exec.cc#L166-L284). The trace is done by creating a tape of expressions in every iteration. Backward is done by traverse the tape in the reverse order.
```c++
void SimpleExecutionEngine::backward(VariableIndex from_where, bool full) {
...
for (int i = num_nodes - 1; i >= 0; --i) {
// each node corresponds to an op
node->backward(xs, node_fx, node_dEdfx, ai, node_dEdxai);
}
...
}
```
...@@ -83,8 +83,13 @@ cc_library(lod_rank_table SRCS lod_rank_table.cc DEPS lod_tensor) ...@@ -83,8 +83,13 @@ cc_library(lod_rank_table SRCS lod_rank_table.cc DEPS lod_tensor)
cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glog) cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glog)
cc_library(executor SRCS executor.cc DEPS op_registry device_context scope if(WITH_DISTRIBUTE)
framework_proto glog lod_rank_table feed_fetch_method) cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
else()
cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method)
endif()
cc_library(parallel_executor SRCS parallel_executor.cc DEPS ssa_graph_builder_factory threaded_ssa_graph_executor scope_buffered_ssa_graph_executor) cc_library(parallel_executor SRCS parallel_executor.cc DEPS ssa_graph_builder_factory threaded_ssa_graph_executor scope_buffered_ssa_graph_executor)
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
class SSAGraph; struct SSAGraph;
class SSAGraghBuilderWithChecker : public SSAGraphBuilder { class SSAGraghBuilderWithChecker : public SSAGraphBuilder {
public: public:
......
...@@ -20,6 +20,9 @@ limitations under the License. */ ...@@ -20,6 +20,9 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/detail/grpc_client.h"
#endif
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -44,6 +47,14 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { ...@@ -44,6 +47,14 @@ ExecutorPrepareContext::~ExecutorPrepareContext() {
Executor::Executor(const platform::Place& place) : place_(place) {} Executor::Executor(const platform::Place& place) : place_(place) {}
#ifdef PADDLE_WITH_DISTRIBUTE
void Executor::Complete() {
::paddle::operators::detail::RPCClient::GetInstance<
::paddle::operators::detail::GRPCClient>()
->SendComplete();
}
#endif
void InitializeVariable(Variable* var, proto::VarType::Type var_type) { void InitializeVariable(Variable* var, proto::VarType::Type var_type) {
if (var_type == proto::VarType::LOD_TENSOR) { if (var_type == proto::VarType::LOD_TENSOR) {
var->GetMutable<LoDTensor>(); var->GetMutable<LoDTensor>();
......
...@@ -44,6 +44,13 @@ class Executor { ...@@ -44,6 +44,13 @@ class Executor {
explicit Executor(const platform::Place& place); explicit Executor(const platform::Place& place);
#ifdef PADDLE_WITH_DISTRIBUTE
/*
* Sending signal to pserver to mark current trainer stop.
*/
void Complete();
#endif
/* @Brief /* @Brief
* Runtime evaluation of the given ProgramDesc under certain Scope * Runtime evaluation of the given ProgramDesc under certain Scope
* *
......
...@@ -34,6 +34,12 @@ void GRPCClient::InitEventLoop() { ...@@ -34,6 +34,12 @@ void GRPCClient::InitEventLoop() {
client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this))); client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this)));
} }
void GRPCClient::SendComplete() {
for (auto& it : channels_) {
this->AsyncSendComplete(it.first);
}
}
GRPCClient::~GRPCClient() { GRPCClient::~GRPCClient() {
Wait(); Wait();
cq_.Shutdown(); cq_.Shutdown();
...@@ -210,6 +216,19 @@ void GRPCClient::AsyncSendFetchBarrier(const std::string& ep, ...@@ -210,6 +216,19 @@ void GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
req_count_++; req_count_++;
} }
void GRPCClient::AsyncSendComplete(const std::string& ep, int64_t time_out) {
const auto ch = GetChannel(ep);
BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
s->Prepare(time_out);
sendrecv::VariableMessage req;
req.set_varname(COMPLETE_MESSAGE);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
req_count_++;
}
void GRPCClient::Wait() { void GRPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_); std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; }); sync_cond_.wait(lk, [this] { return req_count_ == 0; });
......
...@@ -195,6 +195,8 @@ class GRPCClient : public RPCClient { ...@@ -195,6 +195,8 @@ class GRPCClient : public RPCClient {
void Wait() override; void Wait() override;
void SendComplete() override;
protected: protected:
void InitImpl() override; void InitImpl() override;
...@@ -204,6 +206,9 @@ class GRPCClient : public RPCClient { ...@@ -204,6 +206,9 @@ class GRPCClient : public RPCClient {
void Proceed(); void Proceed();
void AsyncSendComplete(const std::string& ep,
int64_t time_out = RPCClient::rpc_time_out);
std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep); std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
private: private:
......
...@@ -162,16 +162,18 @@ class RequestPrefetch final : public RequestBase { ...@@ -162,16 +162,18 @@ class RequestPrefetch final : public RequestBase {
void Process() override { void Process() override {
// prefetch process... // prefetch process...
std::string varname = request_->OutVarname(); std::string in_var_name = request_->Varname();
VLOG(3) << "RequestPrefetch " << varname; std::string out_var_name = request_->OutVarname();
VLOG(3) << "RequestPrefetch, in_var_name: " << in_var_name
<< " out_var_name: " << out_var_name;
auto scope = request_->GetMutableLocalScope(); auto scope = request_->GetMutableLocalScope();
auto invar = scope->FindVar(varname); auto invar = scope->FindVar(in_var_name);
framework::Variable* outvar = nullptr; framework::Variable* outvar = scope->FindVar(out_var_name);
request_handler_->Handle(varname, scope, invar, &outvar); request_handler_->Handle(in_var_name, scope, invar, &outvar, out_var_name);
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(), SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(),
&reply_); &reply_);
Finish(reply_, &responder_); Finish(reply_, &responder_);
} }
...@@ -287,7 +289,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, ...@@ -287,7 +289,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
} else if (rpc_name == kRequestPrefetch) { } else if (rpc_name == kRequestPrefetch) {
b = new RequestPrefetch(&service_, cq.get(), handler, req_id); b = new RequestPrefetch(&service_, cq.get(), handler, req_id);
} else { } else {
PADDLE_ENFORCE(false, "not surpported rpc"); PADDLE_ENFORCE(false, "not supported rpc");
} }
reqs[req_id] = b; reqs[req_id] = b;
......
...@@ -40,6 +40,7 @@ constexpr char kRequestPrefetch[] = "RequestPrefetch"; ...@@ -40,6 +40,7 @@ constexpr char kRequestPrefetch[] = "RequestPrefetch";
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV" #define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV" #define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV"
#define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV" #define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV"
#define COMPLETE_MESSAGE "COMPLETE@RECV"
class RPCServer; class RPCServer;
...@@ -60,9 +61,12 @@ class RequestHandler { ...@@ -60,9 +61,12 @@ class RequestHandler {
void SetDevCtx(const platform::DeviceContext* dev_ctx) { dev_ctx_ = dev_ctx; } void SetDevCtx(const platform::DeviceContext* dev_ctx) { dev_ctx_ = dev_ctx; }
void SetProgram(framework::ProgramDesc* program) { program_ = program; } void SetProgram(framework::ProgramDesc* program) { program_ = program; }
void SetExecutor(framework::Executor* executor) { executor_ = executor; } void SetExecutor(framework::Executor* executor) { executor_ = executor; }
// Used for dist lookup table prefetch
void SetPrefetchPreparedCtx( void SetPrefetchPreparedCtx(
std::unique_ptr<framework::ExecutorPrepareContext> prepared) { std::unordered_map<
prefetch_ctx_.reset(prepared.release()); std::string, std::shared_ptr<framework::ExecutorPrepareContext>>* g) {
prefetch_var_name_to_prepared_ctx_ = g;
} }
// Used for async. // Used for async.
...@@ -78,9 +82,6 @@ class RequestHandler { ...@@ -78,9 +82,6 @@ class RequestHandler {
bool sync_mode() { return sync_mode_; } bool sync_mode() { return sync_mode_; }
framework::Scope* scope() { return scope_; } framework::Scope* scope() { return scope_; }
const platform::DeviceContext* dev_ctx() { return dev_ctx_; } const platform::DeviceContext* dev_ctx() { return dev_ctx_; }
framework::ExecutorPrepareContext* prefetch_ctx() {
return prefetch_ctx_.get();
}
framework::ProgramDesc* program() { return program_; } framework::ProgramDesc* program() { return program_; }
framework::Executor* executor() { return executor_; } framework::Executor* executor() { return executor_; }
...@@ -99,8 +100,8 @@ class RequestHandler { ...@@ -99,8 +100,8 @@ class RequestHandler {
// *request_handler_->dev_ctx(), &reply_); // *request_handler_->dev_ctx(), &reply_);
// } // }
virtual bool Handle(const std::string& varname, framework::Scope* scope, virtual bool Handle(const std::string& varname, framework::Scope* scope,
framework::Variable* var, framework::Variable* var, framework::Variable** outvar,
framework::Variable** outvar) = 0; const std::string& out_var_name = "") = 0;
protected: protected:
const bool sync_mode_; const bool sync_mode_;
...@@ -109,12 +110,17 @@ class RequestHandler { ...@@ -109,12 +110,17 @@ class RequestHandler {
framework::Executor* executor_; framework::Executor* executor_;
framework::Scope* scope_; framework::Scope* scope_;
framework::ProgramDesc* program_; framework::ProgramDesc* program_;
std::unique_ptr<framework::ExecutorPrepareContext> prefetch_ctx_;
// used for distribute lookup table prefetch
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>*
prefetch_var_name_to_prepared_ctx_;
// Used for async. // Used for async.
std::unordered_map<std::string, std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>* std::shared_ptr<framework::ExecutorPrepareContext>>*
grad_to_prepared_ctx_; grad_to_prepared_ctx_;
RPCServer* rpc_server_; RPCServer* rpc_server_;
}; };
......
...@@ -30,7 +30,8 @@ namespace detail { ...@@ -30,7 +30,8 @@ namespace detail {
bool RequestSendHandler::Handle(const std::string& varname, bool RequestSendHandler::Handle(const std::string& varname,
framework::Scope* scope, framework::Scope* scope,
framework::Variable* invar, framework::Variable* invar,
framework::Variable** outvar) { framework::Variable** outvar,
const std::string& out_var_name) {
VLOG(4) << "RequestSendHandler:" << varname; VLOG(4) << "RequestSendHandler:" << varname;
// Async // Async
...@@ -49,6 +50,9 @@ bool RequestSendHandler::Handle(const std::string& varname, ...@@ -49,6 +50,9 @@ bool RequestSendHandler::Handle(const std::string& varname,
if (varname == BATCH_BARRIER_MESSAGE) { if (varname == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "sync: recv batch barrier message"; VLOG(3) << "sync: recv batch barrier message";
rpc_server_->IncreaseBatchBarrier(kRequestSend); rpc_server_->IncreaseBatchBarrier(kRequestSend);
} else if (varname == COMPLETE_MESSAGE) {
VLOG(3) << "sync: recv complete message";
rpc_server_->DecreaseClientNum();
} else { } else {
VLOG(3) << "sync: received var_name: " << varname; VLOG(3) << "sync: received var_name: " << varname;
if (sync_mode_) { if (sync_mode_) {
...@@ -79,7 +83,8 @@ void RequestSendHandler::ResetSparseVarRecorder() { ...@@ -79,7 +83,8 @@ void RequestSendHandler::ResetSparseVarRecorder() {
bool RequestGetHandler::Handle(const std::string& varname, bool RequestGetHandler::Handle(const std::string& varname,
framework::Scope* scope, framework::Scope* scope,
framework::Variable* invar, framework::Variable* invar,
framework::Variable** outvar) { framework::Variable** outvar,
const std::string& out_var_name) {
VLOG(4) << "RequestGetHandler:" << varname; VLOG(4) << "RequestGetHandler:" << varname;
if (varname != FETCH_BARRIER_MESSAGE) { if (varname != FETCH_BARRIER_MESSAGE) {
...@@ -102,13 +107,14 @@ bool RequestGetHandler::Handle(const std::string& varname, ...@@ -102,13 +107,14 @@ bool RequestGetHandler::Handle(const std::string& varname,
bool RequestPrefetchHandler::Handle(const std::string& varname, bool RequestPrefetchHandler::Handle(const std::string& varname,
framework::Scope* scope, framework::Scope* scope,
framework::Variable* invar, framework::Variable* invar,
framework::Variable** outvar) { framework::Variable** outvar,
const std::string& out_var_name) {
VLOG(4) << "RequestPrefetchHandler " << varname; VLOG(4) << "RequestPrefetchHandler " << varname;
auto var_desc = program_->Block(0).FindVar(varname); auto var_desc = program_->Block(0).FindVar(out_var_name);
*outvar = scope->FindVar(varname);
InitializeVariable(*outvar, var_desc->GetType()); InitializeVariable(*outvar, var_desc->GetType());
executor_->RunPreparedContext(prefetch_ctx_.get(), scope); executor_->RunPreparedContext(
(*prefetch_var_name_to_prepared_ctx_)[varname].get(), scope);
return true; return true;
} }
......
...@@ -39,7 +39,8 @@ class RequestSendHandler final : public RequestHandler { ...@@ -39,7 +39,8 @@ class RequestSendHandler final : public RequestHandler {
explicit RequestSendHandler(bool sync_mode) : RequestHandler(sync_mode) {} explicit RequestSendHandler(bool sync_mode) : RequestHandler(sync_mode) {}
virtual ~RequestSendHandler() {} virtual ~RequestSendHandler() {}
bool Handle(const std::string& varname, framework::Scope* scope, bool Handle(const std::string& varname, framework::Scope* scope,
framework::Variable* var, framework::Variable** outvar) override; framework::Variable* var, framework::Variable** outvar,
const std::string& out_var_name = "") override;
void ResetSparseVarRecorder(); void ResetSparseVarRecorder();
private: private:
...@@ -52,7 +53,8 @@ class RequestGetHandler final : public RequestHandler { ...@@ -52,7 +53,8 @@ class RequestGetHandler final : public RequestHandler {
explicit RequestGetHandler(bool sync_mode) : RequestHandler(sync_mode) {} explicit RequestGetHandler(bool sync_mode) : RequestHandler(sync_mode) {}
virtual ~RequestGetHandler() {} virtual ~RequestGetHandler() {}
bool Handle(const std::string& varname, framework::Scope* scope, bool Handle(const std::string& varname, framework::Scope* scope,
framework::Variable* var, framework::Variable** outvar) override; framework::Variable* var, framework::Variable** outvar,
const std::string& out_var_name = "") override;
}; };
class RequestPrefetchHandler final : public RequestHandler { class RequestPrefetchHandler final : public RequestHandler {
...@@ -60,7 +62,8 @@ class RequestPrefetchHandler final : public RequestHandler { ...@@ -60,7 +62,8 @@ class RequestPrefetchHandler final : public RequestHandler {
explicit RequestPrefetchHandler(bool sync_mode) : RequestHandler(sync_mode) {} explicit RequestPrefetchHandler(bool sync_mode) : RequestHandler(sync_mode) {}
virtual ~RequestPrefetchHandler() {} virtual ~RequestPrefetchHandler() {}
bool Handle(const std::string& varname, framework::Scope* scope, bool Handle(const std::string& varname, framework::Scope* scope,
framework::Variable* var, framework::Variable** outvar) override; framework::Variable* var, framework::Variable** outvar,
const std::string& out_var_name = "") override;
}; };
} // namespace detail } // namespace detail
......
...@@ -53,6 +53,11 @@ class RPCClient { ...@@ -53,6 +53,11 @@ class RPCClient {
virtual void AsyncSendFetchBarrier(const std::string& ep, virtual void AsyncSendFetchBarrier(const std::string& ep,
int64_t time_out = rpc_time_out) = 0; int64_t time_out = rpc_time_out) = 0;
// SendComplete tells all the server that current trainer have no more data
// to train, so that the pserver can reduce it's barrier count, and continue
// to train with other trainers.
virtual void SendComplete() = 0;
virtual void Wait() = 0; virtual void Wait() = 0;
static constexpr int64_t rpc_time_out = 120 * 1000; static constexpr int64_t rpc_time_out = 120 * 1000;
......
...@@ -43,7 +43,7 @@ void RPCServer::SavePort() const { ...@@ -43,7 +43,7 @@ void RPCServer::SavePort() const {
void RPCServer::WaitBarrier(const std::string& rpc_name) { void RPCServer::WaitBarrier(const std::string& rpc_name) {
std::unique_lock<std::mutex> lock(this->mutex_); std::unique_lock<std::mutex> lock(this->mutex_);
barrier_cond_.wait(lock, [=] { barrier_cond_.wait(lock, [this, &rpc_name] {
return (barrier_counter_[rpc_name] >= client_num_ || exit_flag_.load()); return (barrier_counter_[rpc_name] >= client_num_ || exit_flag_.load());
}); });
...@@ -53,17 +53,21 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) { ...@@ -53,17 +53,21 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) {
void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) { void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) {
VLOG(3) << "RPCServer begin IncreaseBatchBarrier " << rpc_name; VLOG(3) << "RPCServer begin IncreaseBatchBarrier " << rpc_name;
int b = 0; int b = 0;
{
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
b = ++barrier_counter_[rpc_name]; b = ++barrier_counter_[rpc_name];
}
VLOG(3) << "RPCServer IncreaseBatchBarrier " << rpc_name
<< ", barrier_count:" << b << ", fan_in" << client_num_;
if (b >= client_num_) { if (b >= client_num_) {
lock.unlock();
barrier_cond_.notify_all(); barrier_cond_.notify_all();
lock.lock();
}
}
void RPCServer::DecreaseClientNum() {
{
std::unique_lock<std::mutex> lock(mutex_);
client_num_--;
} }
barrier_cond_.notify_all();
} }
void RPCServer::ResetBarrierCounter() { void RPCServer::ResetBarrierCounter() {
......
...@@ -60,7 +60,7 @@ class RPCServer { ...@@ -60,7 +60,7 @@ class RPCServer {
void SetCond(const std::string& rpc_name); void SetCond(const std::string& rpc_name);
void WaitCond(const std::string& rpc_name); void WaitCond(const std::string& rpc_name);
void IncreaseBatchBarrier(const std::string rpc_name); void IncreaseBatchBarrier(const std::string rpc_name);
void DecreaseClientNum();
void ResetBarrierCounter(); void ResetBarrierCounter();
protected: protected:
...@@ -79,8 +79,7 @@ class RPCServer { ...@@ -79,8 +79,7 @@ class RPCServer {
std::string bind_address_; std::string bind_address_;
std::atomic<int> exit_flag_; std::atomic<int> exit_flag_;
int selected_port_; int selected_port_;
int client_num_;
const int client_num_;
std::unordered_map<std::string, RequestHandler*> rpc_call_map_; std::unordered_map<std::string, RequestHandler*> rpc_call_map_;
std::unordered_map<std::string, int> rpc_thread_num_; std::unordered_map<std::string, int> rpc_thread_num_;
......
...@@ -98,11 +98,17 @@ void StartServer() { ...@@ -98,11 +98,17 @@ void StartServer() {
framework::Executor exe(place); framework::Executor exe(place);
platform::CPUDeviceContext ctx(place); platform::CPUDeviceContext ctx(place);
auto* block = AppendPrefetchBlcok(&program); auto* block = AppendPrefetchBlcok(&program);
auto prepared = exe.Prepare(program, block->ID()); std::string in_var_name("ids");
std::vector<int> prefetch_block_ids{block->ID()};
auto prepared = exe.Prepare(program, prefetch_block_ids);
InitTensorsOnServer(&scope, &place, 10); InitTensorsOnServer(&scope, &place, 10);
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
prefetch_var_name_to_prepared;
prefetch_var_name_to_prepared[in_var_name] = prepared[0];
g_req_handler->SetProgram(&program); g_req_handler->SetProgram(&program);
g_req_handler->SetPrefetchPreparedCtx(std::move(prepared)); g_req_handler->SetPrefetchPreparedCtx(&prefetch_var_name_to_prepared);
g_req_handler->SetDevCtx(&ctx); g_req_handler->SetDevCtx(&ctx);
g_req_handler->SetScope(&scope); g_req_handler->SetScope(&scope);
g_req_handler->SetExecutor(&exe); g_req_handler->SetExecutor(&exe);
......
...@@ -66,40 +66,41 @@ class ElementwiseOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -66,40 +66,41 @@ class ElementwiseOpMaker : public framework::OpProtoAndCheckerMaker {
.SetDefault(-1) .SetDefault(-1)
.EqualGreaterThan(-1); .EqualGreaterThan(-1);
AddComment(string::Sprintf(R"DOC( AddComment(string::Sprintf(R"DOC(
Limited Elementwise %s Operator. Limited Elementwise %s Operator
The equation is: The equation is:
$$%s$$ $$%s$$
$X$ is a tensor of any dimension and the dimensions of tensor $Y$ must be - $X$: a tensor of any dimension.
smaller than or equal to the dimensions of $X$. - $Y$: a tensor whose dimensions must be less than or equal to the dimensions of $X$.
There are two cases for this operator: There are two cases for this operator:
1. The shape of $Y$ is same with $X$;
2. The shape of $Y$ is a congiguous subsequencet of $X$. The trailing dimensions
of size 1 for $Y$ will be ignored for the consideration of subsequence.
1. The shape of $Y$ is the same with $X$.
2. The shape of $Y$ is a continuous subsequence of $X$.
For case 2: For case 2:
$Y$ will be broadcasted to match the shape of $X$ and axis should be 1. Broadcast $Y$ to match the shape of $X$, where $axis$ is the start dimension index
set to index of the start dimension to broadcast $Y$ onto $X$. for broadcasting $Y$ onto $X$.
2. If $axis$ is -1 (default), $axis = rank(X) - rank(Y)$.
3. The trailing dimensions of size 1 for $Y$ will be ignored for the consideration of
subsequence, such as shape(Y) = (2, 1) => (2).
If axis is -1, it is treated as axis=rank(X)-rank(Y). For example:
For example
.. code-block:: python .. code-block:: python
shape(X) = (2, 3, 4, 5), shape(Y) = (,) shape(X) = (2, 3, 4, 5), shape(Y) = (,)
shape(X) = (2, 3, 4, 5), shape(Y) = (5,) shape(X) = (2, 3, 4, 5), shape(Y) = (5,)
shape(X) = (2, 3, 4, 5), shape(Y) = (4, 5) shape(X) = (2, 3, 4, 5), shape(Y) = (4, 5), with axis=-1(default) or axis=2
shape(X) = (2, 3, 4, 5), shape(Y) = (3, 4), with axis=1 shape(X) = (2, 3, 4, 5), shape(Y) = (3, 4), with axis=1
shape(X) = (2, 3, 4, 5), shape(Y) = (2), with axis=0 shape(X) = (2, 3, 4, 5), shape(Y) = (2), with axis=0
shape(X) = (2, 3, 4, 5), shape(Y) = (2, 1), with axis=0 shape(X) = (2, 3, 4, 5), shape(Y) = (2, 1), with axis=0
Either of the inputs $X$ and $Y$ or none can carry the LoD (Level of Details) The inputs $X$ and $Y$ can carry the different LoD information.
information. However, the output only shares the LoD information with input $X$. But the output only shares the LoD information with the input $X$.
)DOC", )DOC",
GetName(), GetEquation())); GetName(), GetEquation()));
......
...@@ -96,19 +96,22 @@ static int64_t GetTimestamp() { ...@@ -96,19 +96,22 @@ static int64_t GetTimestamp() {
return tp.tv_sec * 1000 + tp.tv_usec / 1000; return tp.tv_sec * 1000 + tp.tv_usec / 1000;
} }
void ListenAndServOp::RunSyncLoop(framework::Executor *executor, void ListenAndServOp::RunSyncLoop(
framework::ProgramDesc *program, framework::Executor *executor, framework::ProgramDesc *program,
framework::Scope *recv_scope, framework::Scope *recv_scope,
framework::BlockDesc *prefetch_block) const { const std::vector<int> &prefetch_block_id_list) const {
size_t num_blocks = program->Size(); size_t num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2, PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
std::vector<int> block_list; std::vector<int> optimize_block_id_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) { for (int blkid = 1; blkid < num_blocks; ++blkid) {
block_list.push_back(blkid); if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(),
blkid) == prefetch_block_id_list.end()) {
optimize_block_id_list.push_back(blkid);
} }
auto optimize_prepared = executor->Prepare(*program, block_list); }
auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list);
// Insert placeholder for block0 which holds current op itself. // Insert placeholder for block0 which holds current op itself.
optimize_prepared.insert( optimize_prepared.insert(
optimize_prepared.begin(), optimize_prepared.begin(),
...@@ -135,8 +138,10 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, ...@@ -135,8 +138,10 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
std::vector<size_t> parallel_blkids; std::vector<size_t> parallel_blkids;
parallel_blkids.push_back(1); parallel_blkids.push_back(1);
double ts = GetTimestamp(); double ts = GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) { for (size_t i = 1; i < optimize_block_id_list.size(); ++i) {
if (blkid != static_cast<size_t>(prefetch_block->ID())) { // skip the first optimize block because it is already in the
// parallel_blkids.
int blkid = optimize_block_id_list[i];
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared,
program, recv_scope); program, recv_scope);
...@@ -145,7 +150,6 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, ...@@ -145,7 +150,6 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
} }
parallel_blkids.push_back(blkid); parallel_blkids.push_back(blkid);
} }
}
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program,
recv_scope); recv_scope);
VLOG(2) << "run all blocks spent " << GetTimestamp() - ts << "(ms)"; VLOG(2) << "run all blocks spent " << GetTimestamp() - ts << "(ms)";
...@@ -210,18 +214,19 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, ...@@ -210,18 +214,19 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
} // while(true) } // while(true)
} }
static void FillRequestCtx(detail::RequestHandler *h, framework::Scope *scope, static void FillRequestCtx(
platform::DeviceContext *dev_ctx, detail::RequestHandler *h, framework::Scope *scope,
framework::Executor *executor, platform::DeviceContext *dev_ctx, framework::Executor *executor,
framework::ProgramDesc *program, framework::ProgramDesc *program,
framework::ExecutorPrepareContext *prefetch_ctx, std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
*prefetch_ctx,
detail::RPCServer *rpc_server) { detail::RPCServer *rpc_server) {
h->SetScope(scope); h->SetScope(scope);
h->SetDevCtx(dev_ctx); h->SetDevCtx(dev_ctx);
h->SetExecutor(executor); h->SetExecutor(executor);
h->SetProgram(program); h->SetProgram(program);
h->SetPrefetchPreparedCtx( h->SetPrefetchPreparedCtx(prefetch_ctx);
std::unique_ptr<framework::ExecutorPrepareContext>(prefetch_ctx));
h->SetRPCServer(rpc_server); h->SetRPCServer(rpc_server);
} }
...@@ -255,17 +260,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -255,17 +260,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
request_prefetch_handler_.get()); request_prefetch_handler_.get());
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock); auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *prefetch_block = Attr<framework::BlockDesc *>(kPrefetchBlock);
auto *program = optimize_block->Program(); auto *program = optimize_block->Program();
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
// prepare for prefetch // prepare for prefetch
VLOG(3) << "prefetch block id is " << prefetch_block->ID(); std::vector<int> prefetch_block_id_list;
auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); std::unordered_map<int, std::string> block_id_to_prefetch_var_name;
auto prefetch_var_name_to_block_id_str =
Attr<std::vector<std::string>>(kPrefetchVarNameToBlockId);
for (const auto &prefetch_var_name_and_id :
prefetch_var_name_to_block_id_str) {
std::vector<std::string> pieces;
split(prefetch_var_name_and_id, ':', &pieces);
VLOG(3) << "after split, prefetch_var = " << pieces[0]
<< ", id=" << pieces[1];
PADDLE_ENFORCE_EQ(pieces.size(), 2);
int block_id = std::stoi(pieces[1]);
prefetch_block_id_list.push_back(block_id);
block_id_to_prefetch_var_name[block_id] = pieces[0];
}
auto prefetch_prepared = executor.Prepare(*program, prefetch_block_id_list);
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
prefetch_var_name_to_prepared_ctx;
for (size_t i = 0; i < prefetch_block_id_list.size(); ++i) {
auto block_id = prefetch_block_id_list[i];
auto prefetch_var_name = block_id_to_prefetch_var_name[block_id];
prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i];
}
auto f = std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope, auto f = std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope,
&dev_ctx, &executor, program, prefetch_prepared.release(), &dev_ctx, &executor, program,
rpc_service_.get()); &prefetch_var_name_to_prepared_ctx, rpc_service_.get());
f(request_send_handler_.get()); f(request_send_handler_.get());
f(request_get_handler_.get()); f(request_get_handler_.get());
...@@ -283,7 +313,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -283,7 +313,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
// Write to a file of server selected port for python use. // Write to a file of server selected port for python use.
SavePort(); SavePort();
if (sync_mode) { if (sync_mode) {
RunSyncLoop(&executor, program, &recv_scope, prefetch_block); RunSyncLoop(&executor, program, &recv_scope, prefetch_block_id_list);
} else { } else {
RunAsyncLoop(&executor, program); RunAsyncLoop(&executor, program);
} }
...@@ -309,8 +339,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -309,8 +339,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true);
AddAttr<framework::BlockDesc *>(kOptimizeBlock, AddAttr<framework::BlockDesc *>(kOptimizeBlock,
"BlockID to run on server side."); "BlockID to run on server side.");
AddAttr<framework::BlockDesc *>(kPrefetchBlock, AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId,
"prefetch block to run on server side."); "prefetch blocks to run on server side.")
.SetDefault({});
AddAttr<int>("Fanin", "How many clients send to this server.") AddAttr<int>("Fanin", "How many clients send to this server.")
.SetDefault(1); .SetDefault(1);
} }
......
...@@ -18,6 +18,7 @@ limitations under the License. */ ...@@ -18,6 +18,7 @@ limitations under the License. */
#include <atomic> #include <atomic>
#include <set> #include <set>
#include <string> #include <string>
#include <vector>
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
...@@ -30,7 +31,7 @@ namespace paddle { ...@@ -30,7 +31,7 @@ namespace paddle {
namespace operators { namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock"; constexpr char kOptimizeBlock[] = "OptimizeBlock";
constexpr char kPrefetchBlock[] = "PrefetchBlock"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id";
void RunServer(std::shared_ptr<detail::RPCServer> service); void RunServer(std::shared_ptr<detail::RPCServer> service);
...@@ -46,7 +47,7 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -46,7 +47,7 @@ class ListenAndServOp : public framework::OperatorBase {
void RunSyncLoop(framework::Executor* executor, void RunSyncLoop(framework::Executor* executor,
framework::ProgramDesc* program, framework::ProgramDesc* program,
framework::Scope* recv_scope, framework::Scope* recv_scope,
framework::BlockDesc* prefetch_block) const; const std::vector<int>& prefetch_block_id_list) const;
void RunAsyncLoop(framework::Executor* executor, void RunAsyncLoop(framework::Executor* executor,
framework::ProgramDesc* program) const; framework::ProgramDesc* program) const;
......
...@@ -16,40 +16,34 @@ limitations under the License. */ ...@@ -16,40 +16,34 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace operators { namespace operators {
template <typename AttrType>
class NormOpMaker : public framework::OpProtoAndCheckerMaker { class NormOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() override { void Make() override {
AddInput( AddInput("X", "(Tensor) A tensor of rank >= axis.");
"X", AddAttr<int>("axis",
"(Tensor) The input tensor of norm operator. " "The axis on which to apply normalization. If axis < 0, "
"The format of input tensor is NCHW. Where N is batch size, C is the " "the dimension to normalization is rank(X) + axis. -1 is "
"number of channels, H and W is the height and width of feature."); "the last dimension.");
AddInput("Scale", AddAttr<float>("epsilon",
"(Tensor) The input tensor of norm operator. " "(float, default 1e-10) The epsilon value is used "
"The format of input tensor is C * 1."); "to avoid division by zero.")
AddAttr<AttrType>("epsilon",
"(float, default 1e-10) Constant "
"for numerical stability.")
.SetDefault(1.0e-10f); .SetDefault(1.0e-10f);
AddOutput("Out", AddOutput("Norm",
"(Tensor) The output tensor of norm operator." "(Tensor) A tensor saved the `sqrt(sum(x) + epsion)` will "
"N * M." "be used in backward kernel.")
"M = C * H * W"); .AsIntermediate();
AddOutput("Out", "(Tensor) A tensor of the same shape as X.");
AddComment(R"DOC( AddComment(R"DOC(
"Input shape: $(N, C, H, W)$
Scale shape: $(C, 1)$ Given a tensor, apply 2-normalization along the provided axis.
Output shape: $(N, C, H, W)$
Where $$
forward y = \frac{x}{ \sqrt{\sum {x^2} + epsion }}
$$ $$
[\frac {x_{1}}{\sqrt{\sum{x_{i}^{2}}}} \frac {x_{2}}{\sqrt{\sum{x_{i}^{2}}}} \frac {x_{3}}{\sqrt{\sum{x_{i}^{2}}}} \cdot \cdot \cdot \frac {x_{n}}{\sqrt{\sum{x_{i}^{2}}}}]
$$ where, $\sum {x^2}$ is calculated along the `axis` dimension.
backward
$$ )DOC");
\frac{\frac{\mathrm{d}L }{\mathrm{d}y_{1}} - \frac {x_{1}\sum {\frac{\mathrm{d} L}{\mathrm{d} y_{j}}}x_{j}}{\sum x_{j}^{2}} }{\sqrt{\sum{x_{j}^{2}}}}
$$
)DOC");
} }
}; };
...@@ -58,15 +52,15 @@ class NormOp : public framework::OperatorWithKernel { ...@@ -58,15 +52,15 @@ class NormOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel; using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override { void InferShape(framework::InferShapeContext* ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), PADDLE_ENFORCE(ctx->HasInput("X"),
"Input(X) of NormOp" "Input(X) of NormOp should not be null.");
"should not be null.");
PADDLE_ENFORCE(ctx->HasInput("Scale"),
"Input(Scale) of NormOp"
"should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("Out"), PADDLE_ENFORCE(ctx->HasOutput("Out"),
"Output(Out) of NormOp should not be null."); "Output(Out) of NormOp should not be null.");
auto in_x_dims = ctx->GetInputDim("X"); auto xdim = ctx->GetInputDim("X");
ctx->SetOutputDim("Out", in_x_dims); ctx->SetOutputDim("Out", xdim);
int axis = ctx->Attrs().Get<int>("axis");
if (axis < 0) axis = xdim.size() + axis;
xdim[axis] = 1;
ctx->SetOutputDim("Norm", xdim);
} }
}; };
...@@ -84,12 +78,12 @@ class NormOpGrad : public framework::OperatorWithKernel { ...@@ -84,12 +78,12 @@ class NormOpGrad : public framework::OperatorWithKernel {
} // namespace paddle } // namespace paddle
namespace ops = paddle::operators; namespace ops = paddle::operators;
REGISTER_OPERATOR(norm, ops::NormOp, ops::NormOpMaker<float>, using CPU = paddle::platform::CPUDeviceContext;
REGISTER_OPERATOR(norm, ops::NormOp, ops::NormOpMaker,
paddle::framework::DefaultGradOpDescMaker<true>); paddle::framework::DefaultGradOpDescMaker<true>);
REGISTER_OPERATOR(norm_grad, ops::NormOpGrad); REGISTER_OPERATOR(norm_grad, ops::NormOpGrad);
REGISTER_OP_CPU_KERNEL( REGISTER_OP_CPU_KERNEL(norm, ops::NormKernel<CPU, float>,
norm, ops::NormKernel<paddle::platform::CPUDeviceContext, float>, ops::NormKernel<CPU, double>);
ops::NormKernel<paddle::platform::CPUDeviceContext, double, float>); REGISTER_OP_CPU_KERNEL(norm_grad, ops::NormGradKernel<CPU, float>,
REGISTER_OP_CPU_KERNEL( ops::NormGradKernel<CPU, double>);
norm_grad, ops::NormGradKernel<paddle::platform::CPUDeviceContext, float>,
ops::NormGradKernel<paddle::platform::CPUDeviceContext, double, float>);
...@@ -16,9 +16,9 @@ limitations under the License. */ ...@@ -16,9 +16,9 @@ limitations under the License. */
#include "paddle/fluid/operators/norm_op.h" #include "paddle/fluid/operators/norm_op.h"
namespace ops = paddle::operators; namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL( using CUDA = paddle::platform::CUDADeviceContext;
norm, ops::NormKernel<paddle::platform::CUDADeviceContext, float>,
ops::NormKernel<paddle::platform::CUDADeviceContext, double, float>); REGISTER_OP_CUDA_KERNEL(norm, ops::NormKernel<CUDA, float>,
REGISTER_OP_CUDA_KERNEL( ops::NormKernel<CUDA, double>);
norm_grad, ops::NormGradKernel<paddle::platform::CUDADeviceContext, float>, REGISTER_OP_CUDA_KERNEL(norm_grad, ops::NormGradKernel<CUDA, float>,
ops::NormGradKernel<paddle::platform::CUDADeviceContext, double, float>); ops::NormGradKernel<CUDA, double>);
...@@ -19,156 +19,110 @@ limitations under the License. */ ...@@ -19,156 +19,110 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace operators { namespace operators {
template <typename DeviceContext, typename T, typename AttrType = T> inline void GetDims(const framework::DDim& dim, int axis, int* pre, int* n,
int* post) {
*pre = 1;
*post = 1;
*n = dim[axis];
for (int i = 0; i < axis; ++i) {
(*pre) *= dim[i];
}
for (int i = axis + 1; i < dim.size(); ++i) {
(*post) *= dim[i];
}
}
template <typename DeviceContext, typename T>
class NormKernel : public framework::OpKernel<T> { class NormKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext& context) const override { void Compute(const framework::ExecutionContext& ctx) const override {
const framework::Tensor* in_x = context.Input<framework::Tensor>("X"); auto* in_x = ctx.Input<framework::Tensor>("X");
const framework::Tensor* scale = context.Input<framework::Tensor>("Scale"); auto* out_y = ctx.Output<framework::Tensor>("Out");
auto* out = context.Output<framework::Tensor>("Out"); auto* out_norm = ctx.Output<framework::Tensor>("Norm");
auto epsilon = static_cast<T>(context.Attr<AttrType>("epsilon")); out_y->mutable_data<T>(ctx.GetPlace());
out->mutable_data<T>(context.GetPlace()); out_norm->mutable_data<T>(ctx.GetPlace());
int batch_size = in_x->dims()[0];
int channels = in_x->dims()[1]; auto xdim = in_x->dims();
int height = in_x->dims()[2]; auto ndim = out_norm->dims();
int width = in_x->dims()[3]; T eps = static_cast<T>(ctx.Attr<float>("epsilon"));
int fea_len = height * width; int axis = ctx.Attr<int>("axis");
auto* place = if (axis < 0) axis = xdim.size() + axis;
context.template device_context<DeviceContext>().eigen_device(); int pre, n, post;
auto x = GetDims(xdim, axis, &pre, &n, &post);
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From(
*in_x, framework::make_ddim({batch_size, fea_len * channels})); auto* place = ctx.template device_context<DeviceContext>().eigen_device();
// get square
framework::Tensor x_square; Eigen::DSizes<int, 3> shape(pre, n, post);
x_square.mutable_data<T>(in_x->dims(), context.GetPlace()); Eigen::DSizes<int, 2> norm_shape(pre, post);
auto x_square_eigen =
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( auto x_e = framework::EigenVector<T>::Flatten(*in_x);
x_square, framework::make_ddim({batch_size, fea_len * channels})); auto y_e = framework::EigenVector<T>::Flatten(*out_y);
x_square_eigen.device(*place) = x.square(); auto norm_e = framework::EigenVector<T>::Flatten(*out_norm);
auto scale_eigen = auto x = x_e.reshape(shape);
framework::EigenVector<T, Eigen::RowMajor, Eigen::DenseIndex>::Flatten( auto y = y_e.reshape(shape);
*scale); auto norm = norm_e.reshape(norm_shape);
for (int n = 0; n < batch_size; ++n) {
framework::Tensor in_x_batch = in_x->Slice(n, n + 1); Eigen::DSizes<int, 1> rdim(1);
auto in_x_batch_eigen = // y = x / sqrt((sum(x * x) + epsilon))
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( // norm = sqrt(sum(x * x) + epsilon)
in_x_batch, framework::make_ddim({channels, fea_len})); auto sum = x.pow(2).sum(rdim) + eps;
framework::Tensor x_square_batch = x_square.Slice(n, n + 1); norm.device(*place) = sum.sqrt();
auto x_square_batch_eigen = // y = x / norm
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( Eigen::DSizes<int, 3> rshape(pre, 1, post);
x_square_batch, framework::make_ddim({channels, fea_len})); Eigen::DSizes<int, 3> bcast(1, n, 1);
framework::Tensor out_batch = out->Slice(n, n + 1); y.device(*place) = x / norm.reshape(rshape).broadcast(bcast);
auto out_batch_eigen =
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From(
out_batch, framework::make_ddim({channels, fea_len}));
framework::Tensor tmp_tensor;
tmp_tensor.mutable_data<T>(framework::make_ddim({1, fea_len}),
context.GetPlace());
auto tmp = framework::EigenVector<T, Eigen::RowMajor,
Eigen::DenseIndex>::Flatten(tmp_tensor);
// get colsum and sqrt , inverse
auto dim = Eigen::array<int, 1>({{0}});
tmp.device(*place) = x_square_batch_eigen.sum(dim);
tmp.device(*place) = (tmp + epsilon).sqrt().inverse();
Eigen::array<int, 2> broadcast_dim_col;
broadcast_dim_col[1] = 1;
broadcast_dim_col[0] = channels;
out_batch_eigen.device(*place) =
in_x_batch_eigen * (tmp.broadcast(broadcast_dim_col));
Eigen::array<int, 2> broadcast_dim_row;
broadcast_dim_row[1] = fea_len;
broadcast_dim_row[0] = 1;
out_batch_eigen.device(*place) =
out_batch_eigen * (scale_eigen.broadcast(broadcast_dim_row));
}
} }
}; };
template <typename DeviceContext, typename T, typename AttrType = T> template <typename DeviceContext, typename T, typename AttrType = T>
class NormGradKernel : public framework::OpKernel<T> { class NormGradKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext& context) const override { void Compute(const framework::ExecutionContext& ctx) const override {
const framework::Tensor* in_x = context.Input<framework::Tensor>("X"); auto* in_x = ctx.Input<framework::Tensor>("X");
const framework::Tensor* scale = context.Input<framework::Tensor>("Scale"); auto* in_norm = ctx.Input<framework::Tensor>("Norm");
const framework::Tensor* out_grad = auto* in_dy = ctx.Input<framework::Tensor>(framework::GradVarName("Out"));
context.Input<framework::Tensor>(framework::GradVarName("Out")); auto* out_dx = ctx.Output<framework::Tensor>(framework::GradVarName("X"));
auto epsilon = static_cast<T>(context.Attr<AttrType>("epsilon")); out_dx->mutable_data<T>(ctx.GetPlace());
framework::Tensor* in_x_grad =
context.Output<framework::Tensor>(framework::GradVarName("X")); auto xdim = in_x->dims();
in_x_grad->mutable_data<T>(context.GetPlace()); int axis = ctx.Attr<int>("axis");
int batch_size = in_x->dims()[0]; if (axis < 0) axis = xdim.size() + axis;
int channels = in_x->dims()[1]; int pre, n, post;
int height = in_x->dims()[2]; GetDims(xdim, axis, &pre, &n, &post);
int width = in_x->dims()[3];
int fea_len = height * width; auto* place = ctx.template device_context<DeviceContext>().eigen_device();
auto* place =
context.template device_context<DeviceContext>().eigen_device(); auto x_e = framework::EigenVector<T>::Flatten(*in_x);
auto dy_e = framework::EigenVector<T>::Flatten(*in_dy);
auto scale_eigen = auto norm_e = framework::EigenVector<T>::Flatten(*in_norm);
framework::EigenVector<T, Eigen::RowMajor, Eigen::DenseIndex>::Flatten( auto dx_e = framework::EigenVector<T>::Flatten(*out_dx);
*scale);
auto x = Eigen::DSizes<int, 3> shape(pre, n, post);
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( Eigen::DSizes<int, 2> norm_shape(pre, post);
*in_x, framework::make_ddim({batch_size, fea_len * channels})); auto x = x_e.reshape(shape);
// get square auto dy = dy_e.reshape(shape);
framework::Tensor x_square; auto norm = norm_e.reshape(norm_shape);
x_square.mutable_data<T>(in_x->dims(), context.GetPlace()); auto dx = dx_e.reshape(shape);
auto x_square_eigen =
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( framework::Tensor rsum;
x_square, framework::make_ddim({batch_size, fea_len * channels})); rsum.mutable_data<T>({pre, post}, ctx.GetPlace());
x_square_eigen.device(*place) = x.square(); auto sum = framework::EigenTensor<T, 2>::From(rsum);
for (int n = 0; n < batch_size; ++n) { Eigen::DSizes<int, 1> rdim(1);
framework::Tensor in_x_batch = in_x->Slice(n, n + 1); Eigen::DSizes<int, 3> bcast(1, n, 1);
auto in_x_batch_eigen = Eigen::DSizes<int, 3> rshape(pre, 1, post);
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From(
in_x_batch, framework::make_ddim({channels, fea_len})); // dx = ( dy/sqrt(sum(x*x)) ) * [1 - x*sum(x) / (sum(x*x) + e)]
framework::Tensor in_g_batch = in_x_grad->Slice(n, n + 1); // = [dy - dy * x * sum(x) / (sum(x*x) + e)] / sqrt(sum(x*x))
auto in_g_batch_eigen = // = [dy - x * sum(x*dy) / (sum(x*x) + e)] / sqrt(sum(x*x))
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( // 1. sum = sum(x*dy)
in_g_batch, framework::make_ddim({channels, fea_len})); sum.device(*place) = (x * dy).sum(rdim);
framework::Tensor x_square_batch = x_square.Slice(n, n + 1); // 2. dx = x * sum
auto x_square_batch_eigen = dx.device(*place) = sum.reshape(rshape).broadcast(bcast) * x;
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( // 3. dx / (sum(x*x) + e)
x_square_batch, framework::make_ddim({channels, fea_len})); // where, norm.pow(2) = sum(x*x) + e, which is calculated in forward.
framework::Tensor outg_batch = out_grad->Slice(n, n + 1); dx.device(*place) = dx / norm.pow(2).broadcast(bcast);
auto outg_batch_eigen = // 4. [dy - dx] / sqrt(sum(x*x))
framework::EigenMatrix<T, Eigen::RowMajor, Eigen::DenseIndex>::From( dx.device(*place) = (dy - dx) / norm.broadcast(bcast);
outg_batch, framework::make_ddim({channels, fea_len}));
framework::Tensor tmp_tensor;
tmp_tensor.mutable_data<T>(framework::make_ddim({1, fea_len}),
context.GetPlace());
auto tmp_eigen =
framework::EigenVector<T, Eigen::RowMajor,
Eigen::DenseIndex>::Flatten(tmp_tensor);
auto dim = Eigen::array<int, 1>({{0}});
tmp_eigen.device(*place) = (in_x_batch_eigen * outg_batch_eigen).sum(dim);
framework::Tensor norm_tmp_tensor;
norm_tmp_tensor.mutable_data<T>(framework::make_ddim({1, fea_len}),
context.GetPlace());
auto norm_tmp_eigen =
framework::EigenVector<T, Eigen::RowMajor,
Eigen::DenseIndex>::Flatten(norm_tmp_tensor);
norm_tmp_eigen.device(*place) =
(x_square_batch_eigen.sum(dim) + epsilon).sqrt();
Eigen::array<int, 2> broadcast_dim_col;
broadcast_dim_col[1] = 1;
broadcast_dim_col[0] = channels;
in_g_batch_eigen.device(*place) =
in_x_batch_eigen * tmp_eigen.broadcast(broadcast_dim_col);
in_g_batch_eigen.device(*place) =
in_g_batch_eigen /
(norm_tmp_eigen * norm_tmp_eigen).broadcast(broadcast_dim_col);
in_g_batch_eigen.device(*place) = outg_batch_eigen - in_g_batch_eigen;
// outg_batch_eigen + (in_g_batch_eigen * -1);
in_g_batch_eigen.device(*place) =
in_g_batch_eigen / norm_tmp_eigen.broadcast(broadcast_dim_col);
Eigen::array<int, 2> broadcast_dim_row;
broadcast_dim_row[1] = fea_len;
broadcast_dim_row[0] = 1;
in_g_batch_eigen.device(*place) =
in_g_batch_eigen * (scale_eigen.broadcast(broadcast_dim_row));
}
} }
}; };
} // namespace operators } // namespace operators
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/slice_op.h"
#include <algorithm>
#include <vector>
namespace paddle {
namespace operators {
using Tensor = framework::Tensor;
class SliceOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("Input"),
"Input (Input) of slice op should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("Out"),
"Output (Out) of slice op should not be null.");
auto in_dims = ctx->GetInputDim("Input");
PADDLE_ENFORCE(in_dims.size() < 7,
"The rank of input should be less than 7.");
framework::DDim out_dims(in_dims);
auto axes = ctx->Attrs().Get<std::vector<int>>("axes");
auto starts = ctx->Attrs().Get<std::vector<int>>("starts");
auto ends = ctx->Attrs().Get<std::vector<int>>("ends");
PADDLE_ENFORCE_EQ(starts.size(), ends.size());
PADDLE_ENFORCE_EQ(starts.size(), axes.size());
int dim_value, start, end;
for (size_t i = 0; i < axes.size(); ++i) {
dim_value = out_dims[axes[i]];
start = starts[i] < 0 ? (starts[i] + dim_value) : starts[i];
end = ends[i] < 0 ? (ends[i] + dim_value) : ends[i];
start = std::max(start, 0);
end = std::max(end, 0);
start = std::min(start, dim_value);
end = std::min(end, dim_value);
start = std::min(start, end);
out_dims[axes[i]] = end - start;
}
ctx->SetOutputDim("Out", out_dims);
}
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override {
return framework::OpKernelType(
framework::ToDataType(ctx.Input<Tensor>("Input")->type()),
ctx.GetPlace());
}
};
class SliceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("Input", "Tensor of data to extract slices from.");
AddOutput("Out", "Sliced data tensor.");
AddAttr<std::vector<int>>(
"axes",
"(list<int>) Axes that `starts` and `ends` apply to. It's optional."
"If not present, will be treated as [0, 1, ..., len(`starts`) - 1].");
AddAttr<std::vector<int>>(
"starts",
"(list<int>) Starting indices of corresponding axis in `axes`");
AddAttr<std::vector<int>>(
"ends",
"(list<int>) Starting indices of corresponding axis in `axes`.");
AddComment(R"DOC(
Slice Operator.
Produces a slice of the input tensor along multiple axes. Similar to numpy:
https://docs.scipy.org/doc/numpy/reference/arrays.indexing.html
Slice uses `axes`, `starts` and `ends` attributes to specify the start and
end dimension for each axis in the list of axes, it uses this information
to slice the input data tensor. If a negative value is passed for any of
the start or end indices, it represents number of elements before the end
of that dimension. If the value passed to start or end is larger than
the n (the number of elements in this dimension), it represents n.
For slicing to the end of a dimension with unknown size, it is recommended
to pass in INT_MAX. If axes are omitted, they are set to [0, ..., ndim-1].
Example 1:
Given:
data = [ [1, 2, 3, 4], [5, 6, 7, 8], ]
axes = [0, 1]
starts = [1, 0]
ends = [2, 3]
Then:
result = [ [5, 6, 7], ]
Example 2:
Given:
data = [ [1, 2, 3, 4], [5, 6, 7, 8], ]
starts = [0, 1]
ends = [-1, 1000]
Then:
result = [ [2, 3, 4], ]
)DOC");
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(slice, ops::SliceOp, ops::SliceOpMaker,
paddle::framework::EmptyGradOpMaker);
REGISTER_OP_CPU_KERNEL(
slice, ops::SliceKernel<paddle::platform::CPUDeviceContext, int>,
ops::SliceKernel<paddle::platform::CPUDeviceContext, int64_t>,
ops::SliceKernel<paddle::platform::CPUDeviceContext, float>,
ops::SliceKernel<paddle::platform::CPUDeviceContext, double>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/slice_op.h"
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
slice, ops::SliceKernel<paddle::platform::CUDADeviceContext, float>,
ops::SliceKernel<paddle::platform::CUDADeviceContext, double>,
ops::SliceKernel<paddle::platform::CUDADeviceContext, int>,
ops::SliceKernel<paddle::platform::CUDADeviceContext, int64_t>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <algorithm>
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
namespace paddle {
namespace operators {
template <typename DeviceContext, typename T>
class SliceKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
int rank = ctx.Input<framework::Tensor>("Input")->dims().size();
switch (rank) {
case 1:
SliceCompute<1>(ctx);
break;
case 2:
SliceCompute<2>(ctx);
break;
case 3:
SliceCompute<3>(ctx);
break;
case 4:
SliceCompute<4>(ctx);
break;
case 5:
SliceCompute<5>(ctx);
break;
case 6:
SliceCompute<6>(ctx);
break;
}
}
private:
template <size_t D>
void SliceCompute(const framework::ExecutionContext& context) const {
auto& place =
*context.template device_context<DeviceContext>().eigen_device();
auto in = context.Input<framework::Tensor>("Input");
auto out = context.Output<framework::Tensor>("Out");
out->mutable_data<T>(context.GetPlace());
auto out_dims = out->dims();
auto in_dims = in->dims();
auto axes = context.Attr<std::vector<int>>("axes");
auto starts = context.Attr<std::vector<int>>("starts");
auto offsets = Eigen::array<int, D>();
auto extents = Eigen::array<int, D>();
for (size_t i = 0; i < D; ++i) {
offsets[i] = 0;
extents[i] = out_dims[i];
}
int start;
for (size_t i = 0; i < axes.size(); ++i) {
start = starts[i];
if (start < 0) {
start = (start + in_dims[axes[i]]);
}
start = std::max(start, 0);
offsets[axes[i]] = start;
}
auto in_t =
framework::EigenTensor<T, D, Eigen::RowMajor, Eigen::DenseIndex>::From(
*in);
auto out_t =
framework::EigenTensor<T, D, Eigen::RowMajor, Eigen::DenseIndex>::From(
*out);
out_t.device(place) = in_t.slice(offsets, extents);
}
};
} // namespace operators
} // namespace paddle
...@@ -413,6 +413,9 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -413,6 +413,9 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::Executor>(m, "Executor") py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>()) .def(py::init<const platform::Place &>())
#ifdef PADDLE_WITH_DISTRIBUTE
.def("complete", &Executor::Complete)
#endif
.def("run", .def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) & (void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run); Executor::Run);
......
...@@ -181,6 +181,7 @@ function build() { ...@@ -181,6 +181,7 @@ function build() {
============================================ ============================================
EOF EOF
make clean make clean
make -j `nproc`
make install -j `nproc` make install -j `nproc`
} }
......
此差异已折叠。
...@@ -71,6 +71,7 @@ __all__ = [ ...@@ -71,6 +71,7 @@ __all__ = [
'cumsum', 'cumsum',
'scatter', 'scatter',
'sum', 'sum',
'slice',
'polygon_box_transform', 'polygon_box_transform',
'shape', 'shape',
'maxout', 'maxout',
......
...@@ -387,6 +387,12 @@ class TestBook(unittest.TestCase): ...@@ -387,6 +387,12 @@ class TestBook(unittest.TestCase):
self.assertIsNotNone(output) self.assertIsNotNone(output)
print(str(program)) print(str(program))
def test_l2_normalize(self):
program = Program()
with program_guard(program):
x = layers.data(name='x', shape=[8, 7, 10], dtype="float32")
output = layers.l2_normalize(x, axis=1)
def test_maxout(self): def test_maxout(self):
program = Program() program = Program()
with program_guard(program): with program_guard(program):
......
...@@ -17,44 +17,23 @@ import numpy as np ...@@ -17,44 +17,23 @@ import numpy as np
from op_test import OpTest from op_test import OpTest
def norm(input, scale, epsilon): def l2_norm(x, axis, epsilon):
s0, s1, s2, s3 = input.shape x2 = x**2
x_square = input * input s = np.sum(x2, axis=axis, keepdims=True)
for i in xrange(s0): r = np.sqrt(s + epsilon)
input_batch = input[i:i + 1, :, :, :] y = x / np.broadcast_to(r, x.shape)
input_batch = input_batch.reshape(s1, s2 * s3) return y, r
x_square_batch = x_square[i:i + 1, :, :, :]
x_square_batch = x_square_batch.reshape(s1, s2 * s3)
square_colsum = x_square_batch.sum(axis=0) + epsilon
tmp = pow(square_colsum, 0.5)
tmp = np.reciprocal(tmp)
tmp_tile = np.tile(tmp, s1)
tmp_tile = tmp_tile.reshape(s1, s2 * s3)
scale_tile = np.tile(scale, (1, s2 * s3))
scale_tile = scale_tile.reshape(s1, s2 * s3)
out_batch = input_batch * tmp_tile * scale_tile
out_batch = out_batch.reshape(1, s1, s2, s3)
if i == 0:
out = out_batch
else:
out = np.concatenate((out, out_batch), 0)
out.reshape(s0, s1, s2, s3)
return out
class TestNormOp(OpTest): class TestNormOp(OpTest):
def setUp(self): def setUp(self):
self.op_type = "norm" self.op_type = "norm"
self.init_test_case() self.init_test_case()
input = np.random.random(self.shape).astype("float32") x = np.random.random(self.shape).astype("float64")
scale = np.array([10, 10, 10]) y, norm = l2_norm(x, self.axis, self.epsilon)
self.inputs = { self.inputs = {'X': x}
'X': input.astype('float32'), self.attrs = {'epsilon': self.epsilon, 'axis': self.axis}
'Scale': scale.astype('float32') self.outputs = {'Out': y, 'Norm': norm}
}
self.attrs = {'epsilon': self.epsilon}
output = norm(input, scale, self.epsilon)
self.outputs = {'Out': output.astype('float32')}
def test_check_output(self): def test_check_output(self):
self.check_output() self.check_output()
...@@ -63,8 +42,23 @@ class TestNormOp(OpTest): ...@@ -63,8 +42,23 @@ class TestNormOp(OpTest):
self.check_grad(['X'], 'Out') self.check_grad(['X'], 'Out')
def init_test_case(self): def init_test_case(self):
self.shape = [2, 3, 2, 2] self.shape = [2, 3, 4, 4]
self.epsilon = 1e-6 self.axis = 1
self.epsilon = 1e-8
class TestNormOp2(TestNormOp):
def init_test_case(self):
self.shape = [5, 3, 9, 7]
self.axis = 0
self.epsilon = 1e-8
class TestNormOp3(TestNormOp):
def init_test_case(self):
self.shape = [5, 3, 2, 7]
self.axis = -1
self.epsilon = 1e-8
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -70,8 +70,9 @@ class TestNormalization(unittest.TestCase): ...@@ -70,8 +70,9 @@ class TestNormalization(unittest.TestCase):
def l2_normalize(self, data, axis, epsilon): def l2_normalize(self, data, axis, epsilon):
""" Compute the groundtruth. """ Compute the groundtruth.
""" """
output = data * np.reciprocal( output = data / np.broadcast_to(
np.sum(np.square(data), axis=axis, keepdims=True)) np.sqrt(np.sum(np.square(data), axis=axis, keepdims=True)),
data.shape)
return output return output
def test_l2_normalize(self): def test_l2_normalize(self):
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from op_test import OpTest
class TestSliceOp(OpTest):
def setUp(self):
self.op_type = "slice"
self.config()
self.inputs = {'Input': self.input}
self.outputs = {'Out': self.out}
self.attrs = {
'axes': self.axes,
'starts': self.starts,
'ends': self.ends
}
def config(self):
self.input = np.random.random([3, 4, 5, 6]).astype("float32")
self.starts = [1, 0, 2]
self.ends = [3, 3, 4]
self.axes = [0, 1, 2]
self.out = self.input[1:3, 0:3, 2:4, :]
def test_check_output(self):
self.check_output()
class TestCase1(TestSliceOp):
def config(self):
self.input = np.random.random([3, 4, 5, 6]).astype("float32")
self.starts = [-3, 0, 2]
self.ends = [3, 100, -1]
self.axes = [0, 1, 2]
self.out = self.input[-3:3, 0:100, 2:-1, :]
class TestCase2(TestSliceOp):
def config(self):
self.input = np.random.random([3, 4, 5, 6]).astype("float32")
self.starts = [-3, 0, 2]
self.ends = [3, 100, -1]
self.axes = [0, 1, 3]
self.out = self.input[-3:3, 0:100, :, 2:-1]
if __name__ == '__main__':
unittest.main()
...@@ -515,35 +515,38 @@ class DistributeTranspiler: ...@@ -515,35 +515,38 @@ class DistributeTranspiler:
grad_to_block_id, None) grad_to_block_id, None)
# process distributed lookup_table # process distributed lookup_table
prefetch_block = None prefetch_var_name_to_block_id = []
if self.has_distributed_lookup_table: if self.has_distributed_lookup_table:
pserver_index = self.pserver_endpoints.index(endpoint) pserver_index = self.pserver_endpoints.index(endpoint)
table_opt_block = self._create_table_optimize_block( table_opt_block = self._create_table_optimize_block(
pserver_index, pserver_program, pre_block_idx, grad_to_block_id) pserver_index, pserver_program, pre_block_idx, grad_to_block_id)
prefetch_block = self._create_prefetch_block( prefetch_var_name_to_block_id = self._create_prefetch_block(
pserver_index, pserver_program, table_opt_block) pserver_index, pserver_program, table_opt_block)
# NOTE: if has_distributed_lookup_table is False, then prefetch_block will # NOTE: if has_distributed_lookup_table is False, then prefetch_block will
# not be executed, so it's safe to use optimize_block to hold the place # not be executed, so it's safe to use optimize_block to hold the place
if self.has_distributed_lookup_table: if self.has_distributed_lookup_table:
assert prefetch_block is not None assert len(prefetch_var_name_to_block_id) > 0
else: else:
assert prefetch_block is None assert len(prefetch_var_name_to_block_id) == 0
prefetch_block = pserver_program.global_block()
# step5 append the listen_and_serv op attrs = {
pserver_program.global_block().append_op(
type="listen_and_serv",
inputs={'X': recv_inputs},
outputs={},
attrs={
"OptimizeBlock": pserver_program.block(1), "OptimizeBlock": pserver_program.block(1),
"endpoint": endpoint, "endpoint": endpoint,
"Fanin": self.trainer_num, "Fanin": self.trainer_num,
"PrefetchBlock": prefetch_block,
"sync_mode": self.sync_mode, "sync_mode": self.sync_mode,
"grad_to_block_id": grad_to_block_id "grad_to_block_id": grad_to_block_id
}) }
if len(prefetch_var_name_to_block_id) > 0:
attrs['prefetch_var_name_to_block_id'] \
= prefetch_var_name_to_block_id
# step5 append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
inputs={'X': recv_inputs},
outputs={},
attrs=attrs)
pserver_program.sync_with_cpp() pserver_program.sync_with_cpp()
return pserver_program return pserver_program
...@@ -608,8 +611,15 @@ class DistributeTranspiler: ...@@ -608,8 +611,15 @@ class DistributeTranspiler:
def _replace_lookup_table_op_with_prefetch(self, program, def _replace_lookup_table_op_with_prefetch(self, program,
pserver_endpoints): pserver_endpoints):
# 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op
self.prefetch_input_vars = None # self.all_prefetch_input_vars =
self.prefetch_output_vars = None # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1]
# [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]]
self.all_prefetch_input_vars = []
# self.all_prefetch_input_vars =
# [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1]
# [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]]
self.all_prefetch_output_vars = []
continue_search_lookup_table_op = True continue_search_lookup_table_op = True
while continue_search_lookup_table_op: while continue_search_lookup_table_op:
...@@ -623,18 +633,19 @@ class DistributeTranspiler: ...@@ -623,18 +633,19 @@ class DistributeTranspiler:
ids_name = op.input("Ids") ids_name = op.input("Ids")
out_name = op.output("Out") out_name = op.output("Out")
if self.prefetch_input_vars is None:
ids_var = program.global_block().vars[ids_name[0]] ids_var = program.global_block().vars[ids_name[0]]
self.prefetch_input_vars = self.create_splited_vars( prefetch_input_vars = self.create_splited_vars(
source_var=ids_var, source_var=ids_var,
block=program.global_block(), block=program.global_block(),
tag="_prefetch_in_") tag="_prefetch_in_")
if self.prefetch_output_vars is None: self.all_prefetch_input_vars.append(prefetch_input_vars)
out_var = program.global_block().vars[out_name[0]] out_var = program.global_block().vars[out_name[0]]
self.prefetch_output_vars = self.create_splited_vars( prefetch_output_vars = self.create_splited_vars(
source_var=out_var, source_var=out_var,
block=program.global_block(), block=program.global_block(),
tag="_prefetch_out_") tag="_prefetch_out_")
self.all_prefetch_output_vars.append(prefetch_output_vars)
# insert split_ids_op # insert split_ids_op
program.global_block().insert_op( program.global_block().insert_op(
...@@ -646,14 +657,14 @@ class DistributeTranspiler: ...@@ -646,14 +657,14 @@ class DistributeTranspiler:
for varname in ids_name for varname in ids_name
] ]
}, },
outputs={"Out": self.prefetch_input_vars}) outputs={"Out": prefetch_input_vars})
# insert prefetch_op # insert prefetch_op
program.global_block().insert_op( program.global_block().insert_op(
index=lookup_table_op_index + 1, index=lookup_table_op_index + 1,
type="prefetch", type="prefetch",
inputs={'X': self.prefetch_input_vars}, inputs={'X': prefetch_input_vars},
outputs={"Out": self.prefetch_output_vars}, outputs={"Out": prefetch_output_vars},
attrs={ attrs={
"epmap": pserver_endpoints, "epmap": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
...@@ -668,7 +679,7 @@ class DistributeTranspiler: ...@@ -668,7 +679,7 @@ class DistributeTranspiler:
program.global_block().vars[varname] program.global_block().vars[varname]
for varname in ids_name for varname in ids_name
], ],
'X': self.prefetch_output_vars 'X': prefetch_output_vars
}, },
outputs={ outputs={
"Out": [ "Out": [
...@@ -714,14 +725,16 @@ class DistributeTranspiler: ...@@ -714,14 +725,16 @@ class DistributeTranspiler:
optimize_block): optimize_block):
# STEP: create prefetch block # STEP: create prefetch block
table_var = pserver_program.global_block().vars[self.table_name] table_var = pserver_program.global_block().vars[self.table_name]
prefetch_var_name_to_block_id = []
for index in range(len(self.all_prefetch_input_vars)):
prefetch_block = pserver_program.create_block(optimize_block.idx) prefetch_block = pserver_program.create_block(optimize_block.idx)
trainer_ids = self.prefetch_input_vars[pserver_index] trainer_ids = self.all_prefetch_input_vars[index][pserver_index]
pserver_ids = pserver_program.global_block().create_var( pserver_ids = pserver_program.global_block().create_var(
name=trainer_ids.name, name=trainer_ids.name,
type=trainer_ids.type, type=trainer_ids.type,
shape=trainer_ids.shape, shape=trainer_ids.shape,
dtype=trainer_ids.dtype) dtype=trainer_ids.dtype)
trainer_out = self.prefetch_output_vars[pserver_index] trainer_out = self.all_prefetch_output_vars[index][pserver_index]
pserver_out = pserver_program.global_block().create_var( pserver_out = pserver_program.global_block().create_var(
name=trainer_out.name, name=trainer_out.name,
type=trainer_out.type, type=trainer_out.type,
...@@ -737,7 +750,9 @@ class DistributeTranspiler: ...@@ -737,7 +750,9 @@ class DistributeTranspiler:
"is_distributed": True, "is_distributed": True,
"padding_idx": -1 "padding_idx": -1
}) })
return prefetch_block prefetch_var_name_to_block_id.append(trainer_ids.name + ":" + str(
prefetch_block.idx))
return prefetch_var_name_to_block_id
def _create_table_optimize_block(self, pserver_index, pserver_program, def _create_table_optimize_block(self, pserver_index, pserver_program,
pre_block_idx, grad_to_block_id): pre_block_idx, grad_to_block_id):
......
...@@ -126,7 +126,8 @@ class DocstringChecker(BaseChecker): ...@@ -126,7 +126,8 @@ class DocstringChecker(BaseChecker):
'W9002': 'W9002':
('Doc string does not end with "." period', symbol + "-end-with", ('Doc string does not end with "." period', symbol + "-end-with",
'Used when a doc string does not end with a period'), 'Used when a doc string does not end with a period'),
'W9003': ('All args with their types must be mentioned in doc string', 'W9003':
('All args with their types must be mentioned in doc string %s',
symbol + "-with-all-args", symbol + "-with-all-args",
'Used when not all arguments are in the doc string '), 'Used when not all arguments are in the doc string '),
'W9005': ('Missing docstring or docstring is too short', 'W9005': ('Missing docstring or docstring is too short',
...@@ -178,6 +179,8 @@ class DocstringChecker(BaseChecker): ...@@ -178,6 +179,8 @@ class DocstringChecker(BaseChecker):
self.indent_style(node) self.indent_style(node)
def missing_doc_string(self, node): def missing_doc_string(self, node):
if node.name.startswith("__") or node.name.startswith("_"):
return True
if node.tolineno - node.fromlineno <= 10: if node.tolineno - node.fromlineno <= 10:
return True return True
...@@ -199,12 +202,16 @@ class DocstringChecker(BaseChecker): ...@@ -199,12 +202,16 @@ class DocstringChecker(BaseChecker):
doc = node.doc doc = node.doc
lines = doc.splitlines() lines = doc.splitlines()
line_num = 0
for l in lines: for l in lines:
if line_num == 0:
continue
cur_indent = len(l) - len(l.lstrip()) cur_indent = len(l) - len(l.lstrip())
if cur_indent % indent != 0: if cur_indent % indent != 0:
self.add_message('W9006', node=node, line=node.fromlineno) self.add_message('W9006', node=node, line=node.fromlineno)
return False return False
line_num += 1
return True return True
...@@ -320,15 +327,19 @@ class DocstringChecker(BaseChecker): ...@@ -320,15 +327,19 @@ class DocstringChecker(BaseChecker):
return True return True
parsed_args = doc.args parsed_args = doc.args
args_not_documented = set(args) - set(parsed_args)
if len(args) > 0 and len(parsed_args) <= 0: if len(args) > 0 and len(parsed_args) <= 0:
print "debug:parsed args: ", parsed_args self.add_message(
self.add_message('W9003', node=node, line=node.fromlineno) 'W9003',
node=node,
line=node.fromlineno,
args=list(args_not_documented))
return False return False
for t in args: for t in args:
if t not in parsed_args: if t not in parsed_args:
print t, " with (type) not in ", parsed_args self.add_message(
self.add_message('W9003', node=node, line=node.fromlineno) 'W9003', node=node, line=node.fromlineno, args=[t, ])
return False return False
return True return True
...@@ -7,13 +7,13 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" ...@@ -7,13 +7,13 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export PYTHONPATH=$DIR:$PYTHONPATH export PYTHONPATH=$DIR:$PYTHONPATH
# The trick to remove deleted files: https://stackoverflow.com/a/2413151 # The trick to remove deleted files: https://stackoverflow.com/a/2413151
for file in $(git diff --cached --name-status | awk '$1 != "D" {print $2}'); do for file in $(git diff --name-status | awk '$1 != "D" {print $2}'); do
pylint --disable=all --load-plugins=docstring_checker \ pylint --disable=all --load-plugins=docstring_checker \
--enable=doc-string-one-line,doc-string-end-with,doc-string-with-all-args,doc-string-triple-quotes,doc-string-missing,doc-string-indent-error,doc-string-with-returns,doc-string-with-raises $file; --enable=doc-string-one-line,doc-string-end-with,doc-string-with-all-args,doc-string-triple-quotes,doc-string-missing,doc-string-indent-error,doc-string-with-returns,doc-string-with-raises $file;
TOTAL_ERRORS=$(expr $TOTAL_ERRORS + $?); TOTAL_ERRORS=$(expr $TOTAL_ERRORS + $?);
done done
#exit $TOTAL_ERRORS exit $TOTAL_ERRORS
#For now, just warning: #For now, just warning:
exit 0 #exit 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册