提交 fb7e4791 编写于 作者: T tangwei12

merger paddle develop

...@@ -76,7 +76,8 @@ RUN easy_install -U pip && \ ...@@ -76,7 +76,8 @@ RUN easy_install -U pip && \
pip install sphinx-rtd-theme==0.1.9 recommonmark pip install sphinx-rtd-theme==0.1.9 recommonmark
RUN pip install pre-commit 'ipython==5.3.0' && \ RUN pip install pre-commit 'ipython==5.3.0' && \
pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \
pip install opencv-python
#For docstring checker #For docstring checker
RUN pip install pylint pytest astroid isort RUN pip install pylint pytest astroid isort
......
...@@ -54,7 +54,7 @@ ExternalProject_Add( ...@@ -54,7 +54,7 @@ ExternalProject_Add(
${EXTERNAL_PROJECT_LOG_ARGS} ${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS ${MKLDNN_DEPENDS} DEPENDS ${MKLDNN_DEPENDS}
GIT_REPOSITORY "https://github.com/01org/mkl-dnn.git" GIT_REPOSITORY "https://github.com/01org/mkl-dnn.git"
GIT_TAG "db3424ad44901513c03a1ea31ccaacdf633fbe9f" GIT_TAG "a29d8487a63afca3d5b8c5bbdbb473cf8ccc6e51"
PREFIX ${MKLDNN_SOURCES_DIR} PREFIX ${MKLDNN_SOURCES_DIR}
UPDATE_COMMAND "" UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${MKLDNN_INSTALL_DIR} CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${MKLDNN_INSTALL_DIR}
......
...@@ -213,3 +213,12 @@ virtualenv本身也是Python的一个包,可以用pip进行安装: ...@@ -213,3 +213,12 @@ virtualenv本身也是Python的一个包,可以用pip进行安装:
保存并关闭文件。 保存并关闭文件。
这样,每次打开终端时就会自动启动名为‘paddle’的Python环境了。 这样,每次打开终端时就会自动启动名为‘paddle’的Python环境了。
10. 通过pip安装的PaddlePaddle在 :code:`import paddle.fluid` 报找不到 :code:`libmkldnn.so` 或 :code:`libmklml_intel.so`
------------------------------------------------------------------------------------------
出现这种问题的原因是在导入 :code:`paddle.fluid` 时需要加载 :code:`libmkldnn.so` 和 :code:`libmklml_intel.so`,
但是系统没有找到该文件。一般通过pip安装PaddlePaddle时会将 :code:`libmkldnn.so` 和 :code:`libmklml_intel.so`
拷贝到 :code:`/usr/local/lib` 路径下,所以解决办法是将该路径加到 :code:`LD_LIBRARY_PATH` 环境变量下,
即: :code:`export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH` 。
**注意**:如果是在虚拟环境中安装PaddlePaddle, :code:`libmkldnn.so` 和 :code:`libmklml_intel.so` 可能不在 :code:`/usr/local/lib` 路径下。
\ No newline at end of file
# Inference High-level APIs
This document describes the high-level inference APIs one can use to easily deploy a Paddle model for an application.
The APIs are described in `paddle_inference_api.h`, just one header file, and two libaries `libpaddle_fluid.so` and `libpaddle_fluid_api.so` are needed.
## PaddleTensor
We provide the `PaddleTensor` data structure is to give a general tensor interface.
The definition is
```c++
struct PaddleTensor {
std::string name; // variable name.
std::vector<int> shape;
PaddleBuf data; // blob of data.
PaddleDType dtype;
};
```
The data is stored in a continuous memory `PaddleBuf`, and tensor's data type is specified by a `PaddleDType`.
The `name` field is used to specify the name of input variable,
that is important when there are multiple inputs and need to distiuish which variable to set.
## engine
The inference APIs has two different underlying implementation, currently there are two valid engines:
- the native engine, which is consists of the native operators and framework,
- the Anakin engine, which is a Anakin library embeded.
The native engine takes a native Paddle model as input, and supports any model that trained by Paddle,
but the Anakin engine can only take the Anakin model as input(user need to manully transform the format first) and currently not all Paddle models are supported.
```c++
enum class PaddleEngineKind {
kNative = 0, // Use the native Fluid facility.
kAnakin, // Use Anakin for inference.
};
```
## PaddlePredictor and how to create one
The main interface is `PaddlePredictor`, there are following methods
- `bool Run(const std::vector<PaddleTensor>& inputs, std::vector<PaddleTensor>* output_data)`
- take inputs and output `output_data`
- `Clone` to clone a predictor from an existing one, with model parameter shared.
There is a factory method to help create a predictor, and the user takes the ownership of this object.
```c++
template <typename ConfigT, PaddleEngineKind engine = PaddleEngineKind::kNative>
std::unique_ptr<PaddlePredictor> CreatePaddlePredictor(const ConfigT& config);
```
By specifying the engine kind and config, one can get an specific implementation.
## Reference
- [paddle_inference_api.h](./paddle_inference_api.h)
- [demos](./demo)
...@@ -109,8 +109,7 @@ class PaddlePredictor { ...@@ -109,8 +109,7 @@ class PaddlePredictor {
// The common configs for all the predictors. // The common configs for all the predictors.
struct Config { struct Config {
std::string model_dir; // path to the model directory. std::string model_dir; // path to the model directory.
bool enable_engine{false}; // Enable to execute (part of) the model on
}; };
}; };
......
...@@ -73,6 +73,9 @@ void BroadcastOpHandle::RunImpl() { ...@@ -73,6 +73,9 @@ void BroadcastOpHandle::RunImpl() {
int root_id = boost::get<platform::CUDAPlace>(in_tensor.place()).device; int root_id = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
std::vector<std::function<void()>> broadcast_calls; std::vector<std::function<void()>> broadcast_calls;
int type = platform::ToNCCLDataType(in_tensor.type());
size_t numel = static_cast<size_t>(in_tensor.numel());
for (auto out_var_handle : out_var_handles) { for (auto out_var_handle : out_var_handles) {
Variable *out_var = var_scopes.at(out_var_handle->scope_idx_) Variable *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_); ->FindVar(out_var_handle->name_);
...@@ -87,13 +90,11 @@ void BroadcastOpHandle::RunImpl() { ...@@ -87,13 +90,11 @@ void BroadcastOpHandle::RunImpl() {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>()); send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out_var_handle; out_handle = out_var_handle;
} else { } else {
send_recv_buffer = send_recv_buffer = VariableVisitor::GetMutableTensor(out_var)
VariableVisitor::GetMutableTensor(out_var).mutable_data( .Resize(in_tensor.dims())
out_var_handle->place_); .mutable_data(out_var_handle->place_);
} }
int type = platform::ToNCCLDataType(in_tensor.type());
size_t numel = static_cast<size_t>(in_tensor.numel());
broadcast_calls.emplace_back( broadcast_calls.emplace_back(
[send_recv_buffer, numel, type, root_id, &nccl_ctx] { [send_recv_buffer, numel, type, root_id, &nccl_ctx] {
PADDLE_ENFORCE(platform::dynload::ncclBcast( PADDLE_ENFORCE(platform::dynload::ncclBcast(
......
...@@ -351,7 +351,7 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result, ...@@ -351,7 +351,7 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result,
auto &prev_grad = vars.back(); auto &prev_grad = vars.back();
op_handle->AddInput(prev_grad.get()); op_handle->AddInput(prev_grad.get());
auto var = new VarHandle(vars.size() - 1, i, og, p); auto var = new VarHandle(vars.size(), i, og, p);
vars.emplace_back(var); vars.emplace_back(var);
op_handle->AddOutput(var); op_handle->AddOutput(var);
} }
...@@ -447,8 +447,7 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result, ...@@ -447,8 +447,7 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result,
op_handle->AddInput(prev_grad.get()); op_handle->AddInput(prev_grad.get());
} }
auto &vars = result->vars_[dst_dev_id][og]; auto &vars = result->vars_[dst_dev_id][og];
auto var = auto var = new VarHandle(vars.size(), dst_dev_id, og, places_[dst_dev_id]);
new VarHandle(vars.size() - 1, dst_dev_id, og, places_[dst_dev_id]);
vars.emplace_back(var); vars.emplace_back(var);
op_handle->AddOutput(var); op_handle->AddOutput(var);
return var; return var;
......
...@@ -47,7 +47,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { ...@@ -47,7 +47,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#endif #endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override; std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
int GetVarDeviceID(const std::string &varname) const; int GetVarDeviceID(const std::string &varname) const override;
private: private:
void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op, void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op,
......
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/details/op_handle_base.h"
#include <map>
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -122,11 +122,16 @@ void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) { ...@@ -122,11 +122,16 @@ void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (!events_.empty()) { // Use event if (!events_.empty()) { // Use event
std::function<void()> method = callback; std::function<void()> method = callback;
// NOTE(zcd): device context must be ordered here because RecordEvent
// will use a mutex to ensure the safe of multi-threads.
std::map<platform::DeviceContext *, platform::Place> ordered_ctxes;
for (auto &p : dev_ctxes_) { for (auto &p : dev_ctxes_) {
ordered_ctxes.emplace(p.second, p.first);
}
for (auto &p : ordered_ctxes) {
method = [method, p, this]() { method = [method, p, this]() {
static_cast<platform::CUDADeviceContext *>(p.second)->RecordEvent( static_cast<platform::CUDADeviceContext *>(p.first)->RecordEvent(
events_.at(boost::get<platform::CUDAPlace>(p.first).device), events_.at(boost::get<platform::CUDAPlace>(p.second).device),
method); method);
}; };
} }
......
...@@ -27,6 +27,7 @@ enum AttrType { ...@@ -27,6 +27,7 @@ enum AttrType {
BOOLEANS = 7; BOOLEANS = 7;
BLOCK = 8; BLOCK = 8;
LONG = 9; LONG = 9;
BLOCKS = 10;
} }
// OpDesc describes an instance of a C++ framework::OperatorBase // OpDesc describes an instance of a C++ framework::OperatorBase
...@@ -46,6 +47,7 @@ message OpDesc { ...@@ -46,6 +47,7 @@ message OpDesc {
repeated bool bools = 11; repeated bool bools = 11;
optional int32 block_idx = 12; optional int32 block_idx = 12;
optional int64 l = 13; optional int64 l = 13;
repeated int32 blocks_idx = 14;
}; };
message Var { message Var {
......
...@@ -51,8 +51,6 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) { ...@@ -51,8 +51,6 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) {
} }
std::ostream &operator<<(std::ostream &os, const LoDTensor &t) { std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code());
if (!platform::is_cpu_place(t.place())) { if (!platform::is_cpu_place(t.place())) {
LoDTensor tt; LoDTensor tt;
framework::TensorCopy(t, platform::CPUPlace(), &tt); framework::TensorCopy(t, platform::CPUPlace(), &tt);
...@@ -70,7 +68,13 @@ std::ostream &operator<<(std::ostream &os, const LoDTensor &t) { ...@@ -70,7 +68,13 @@ std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
// only print first ten elements // only print first ten elements
int64_t size = t.numel() < 10 ? t.numel() : 10; int64_t size = t.numel() < 10 ? t.numel() : 10;
for (int64_t i = 0; i < size; ++i) { for (int64_t i = 0; i < size; ++i) {
os << t.data<float>()[i] << " "; if (t.type().hash_code() == typeid(float).hash_code()) {
os << t.data<float>()[i] << " ";
} else if (t.type().hash_code() == typeid(int64_t).hash_code()) {
os << t.data<int64_t>()[i] << " ";
} else {
PADDLE_THROW("LoDTensor data type not in [float, int64_t]");
}
} }
return os; return os;
......
...@@ -26,6 +26,20 @@ ...@@ -26,6 +26,20 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
TEST(LoD, PrintLoDTensor) {
LoDTensor tensor1;
tensor1.mutable_data<float>(platform::CPUPlace());
tensor1.data<float>()[0] = 0.2;
tensor1.data<float>()[1] = 0.5;
LOG(INFO) << tensor1;
LoDTensor tensor2;
tensor2.mutable_data<int64_t>(platform::CPUPlace());
tensor2.data<int64_t>()[0] = 1;
tensor2.data<int64_t>()[1] = 2;
LOG(INFO) << tensor2;
}
TEST(LoD, data) { TEST(LoD, data) {
LoD lod{{0, 1, 2}}; LoD lod{{0, 1, 2}};
lod.push_back({0, 2, 4, 5}); lod.push_back({0, 2, 4, 5});
...@@ -37,7 +51,7 @@ TEST(LoD, data) { ...@@ -37,7 +51,7 @@ TEST(LoD, data) {
} }
} }
TEST(LodExpand, test) { TEST(LoD, ExpandLoD) {
LoD lod{{0, 2}}; LoD lod{{0, 2}};
LoDTensor tensor; LoDTensor tensor;
tensor.set_lod(lod); tensor.set_lod(lod);
......
...@@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) { ...@@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) {
need_update_ = true; need_update_ = true;
} }
void OpDesc::SetBlocksAttr(const std::string &name,
std::vector<BlockDesc *> blocks) {
this->attrs_[name] = blocks;
need_update_ = true;
}
void OpDesc::SetAttrMap( void OpDesc::SetAttrMap(
const std::unordered_map<std::string, Attribute> &attr_map) { const std::unordered_map<std::string, Attribute> &attr_map) {
attrs_ = attr_map; attrs_ = attr_map;
...@@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor<void> { ...@@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor<void> {
void operator()(const std::vector<bool> &v) const { void operator()(const std::vector<bool> &v) const {
VectorToRepeated(v, attr_->mutable_bools()); VectorToRepeated(v, attr_->mutable_bools());
} }
void operator()(const std::vector<BlockDesc *> &v) const {
std::vector<int> blocks_idx;
for (auto blk : v) {
blocks_idx.push_back(blk->ID());
}
VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx());
}
void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); } void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); }
void operator()(int64_t v) const { attr_->set_l(v); } void operator()(int64_t v) const { attr_->set_l(v); }
void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); } void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); }
......
...@@ -77,6 +77,8 @@ class OpDesc { ...@@ -77,6 +77,8 @@ class OpDesc {
void SetBlockAttr(const std::string &name, BlockDesc *block); void SetBlockAttr(const std::string &name, BlockDesc *block);
void SetBlocksAttr(const std::string &name, std::vector<BlockDesc *> blocks);
Attribute GetAttr(const std::string &name) const; Attribute GetAttr(const std::string &name) const;
Attribute GetNullableAttr(const std::string &name) const; Attribute GetNullableAttr(const std::string &name) const;
......
...@@ -121,7 +121,7 @@ ParallelExecutor::ParallelExecutor( ...@@ -121,7 +121,7 @@ ParallelExecutor::ParallelExecutor(
#endif #endif
} }
builder_ = std::move(builder_factory.Create()); builder_ = builder_factory.Create();
member_->executor_.reset(new details::ThreadedSSAGraphExecutor( member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, exec_strategy, member_->local_scopes_, places,
builder_->Build(main_program))); builder_->Build(main_program)));
......
...@@ -35,7 +35,8 @@ using VariableNameMap = std::map<std::string, std::vector<std::string>>; ...@@ -35,7 +35,8 @@ using VariableNameMap = std::map<std::string, std::vector<std::string>>;
using Attribute = using Attribute =
boost::variant<boost::blank, int, float, std::string, std::vector<int>, boost::variant<boost::blank, int, float, std::string, std::vector<int>,
std::vector<float>, std::vector<std::string>, bool, std::vector<float>, std::vector<std::string>, bool,
std::vector<bool>, BlockDesc*, int64_t>; std::vector<bool>, BlockDesc*, int64_t,
std::vector<BlockDesc*>>;
using AttributeMap = std::unordered_map<std::string, Attribute>; using AttributeMap = std::unordered_map<std::string, Attribute>;
......
...@@ -18,6 +18,7 @@ limitations under the License. */ ...@@ -18,6 +18,7 @@ limitations under the License. */
#include <limits> #include <limits>
#include "glog/logging.h" // For VLOG
#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/operators/distributed/request_handler.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep, ...@@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = var_name_val; var_h.name = var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Send";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
SendProcessor* s = new SendProcessor(ch); SendProcessor* s = new SendProcessor(ch);
...@@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep, ...@@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = var_name_val; var_h.name = var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Get";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
GetProcessor* s = new GetProcessor(ch); GetProcessor* s = new GetProcessor(ch);
...@@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep, ...@@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = out_var_name_val; var_h.name = out_var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Prefetch";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
GetProcessor* s = new GetProcessor(ch); GetProcessor* s = new GetProcessor(ch);
...@@ -260,10 +270,11 @@ void GRPCClient::Proceed() { ...@@ -260,10 +270,11 @@ void GRPCClient::Proceed() {
GPR_ASSERT(ok); GPR_ASSERT(ok);
PADDLE_ENFORCE(c); PADDLE_ENFORCE(c);
if (c->status_.ok()) { if (c->status_.ok()) {
VLOG(3) << c->var_h_.String() << " process";
c->Process(); c->Process();
} else { } else {
LOG(FATAL) << "var: " << c->var_h_.String() LOG(FATAL) << c->var_h_.String()
<< " grpc error:" << c->status_.error_message(); << " meets grpc error:" << c->status_.error_message();
} }
delete c; delete c;
{ {
......
...@@ -47,14 +47,18 @@ namespace operators { ...@@ -47,14 +47,18 @@ namespace operators {
namespace distributed { namespace distributed {
struct VarHandle { struct VarHandle {
// RPC endpoint.
std::string ep; std::string ep;
const platform::DeviceContext* ctx; const platform::DeviceContext* ctx;
const framework::Scope* scope; const framework::Scope* scope;
// Variable name.
std::string name; std::string name;
// RPC method name.
std::string method;
std::string String() const { std::string String() const {
std::ostringstream s; std::ostringstream s;
s << "name:[" << name << "] ep:[" << ep << "]"; s << method << " name:[" << name << "], ep:[" << ep << "]";
return s.str(); return s.str();
} }
}; };
......
...@@ -41,6 +41,19 @@ class RequestBase { ...@@ -41,6 +41,19 @@ class RequestBase {
virtual ~RequestBase() {} virtual ~RequestBase() {}
virtual void Process() = 0; virtual void Process() = 0;
std::string Status2String(const std::string& method) {
std::string status = "Process";
if (status_ == FINISH) {
status = "Finish";
}
std::ostringstream s;
s << method << " name:[" << GetReqName() << "]"
<< ", ep:[" << ctx_.peer() << "]"
<< " " << status << " using req_id:" << req_id_;
return s.str();
}
CallStatus Status() const { CallStatus Status() const {
std::lock_guard<std::mutex> l(status_mu_); std::lock_guard<std::mutex> l(status_mu_);
return status_; return status_;
...@@ -312,7 +325,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, ...@@ -312,7 +325,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
int req_id) { int req_id) {
std::unique_lock<std::mutex> lock(cq_mutex_); std::unique_lock<std::mutex> lock(cq_mutex_);
if (is_shut_down_) { if (is_shut_down_) {
VLOG(3) << "shutdown, do not TryToRegisterNewSendOne"; LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne";
return; return;
} }
...@@ -348,14 +361,14 @@ void AsyncGRPCServer::HandleRequest( ...@@ -348,14 +361,14 @@ void AsyncGRPCServer::HandleRequest(
bool ok = false; bool ok = false;
while (true) { while (true) {
VLOG(3) << "HandleRequest " << rpc_name << " wait next"; VLOG(4) << "HandleRequest " << rpc_name << " wait next";
if (!cq->Next(&tag, &ok)) { if (!cq->Next(&tag, &ok)) {
VLOG(3) << "CompletionQueue " << rpc_name << " shutdown!"; VLOG(3) << "CompletionQueue " << rpc_name << " shutdown!";
break; break;
} }
int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag)); int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag));
VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id
<< " get next"; << " get next";
auto& reqs = rpc_reqs_[rpc_name]; auto& reqs = rpc_reqs_[rpc_name];
...@@ -366,22 +379,21 @@ void AsyncGRPCServer::HandleRequest( ...@@ -366,22 +379,21 @@ void AsyncGRPCServer::HandleRequest(
base = reqs[req_id]; base = reqs[req_id];
} }
VLOG(3) << base->Status2String(rpc_name);
// reference: // reference:
// https://github.com/tensorflow/tensorflow/issues/5596 // https://github.com/tensorflow/tensorflow/issues/5596
// https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM
// https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I
if (!ok) { if (!ok) {
LOG(WARNING) << "completion queue:" << rpc_name LOG(WARNING) << "completion queue:" << rpc_name
<< " recv no regular event:argument name[" << " recv no regular event"
<< base->GetReqName() << "]"; << " context:" << base->Status2String(rpc_name);
TryToRegisterNewOne(rpc_name, req_id); TryToRegisterNewOne(rpc_name, req_id);
delete base; delete base;
continue; continue;
} }
VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id
<< ", status:" << base->Status();
switch (base->Status()) { switch (base->Status()) {
case PROCESS: { case PROCESS: {
base->Process(); base->Process();
......
...@@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
if (total_written + size_to_write > length) { if (total_written + size_to_write > length) {
size_to_write = length - total_written; size_to_write = length - total_written;
} }
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
memory::Copy(boost::get<platform::CUDAPlace>(place), memory::Copy(boost::get<platform::CUDAPlace>(place),
reinterpret_cast<void*>(p), cpu, data, size_to_write, reinterpret_cast<void*>(p), cpu, data, size_to_write,
gpu_dev_ctx.stream()); gpu_dev_ctx.stream());
...@@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
} }
// TODO(gongwb): can we avoid copy? // TODO(gongwb): can we avoid copy?
platform::CPUPlace cpu; platform::CPUPlace cpu;
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write); memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);
p += size_to_write; p += size_to_write;
......
...@@ -102,18 +102,16 @@ void ListenAndServOp::RunSyncLoop( ...@@ -102,18 +102,16 @@ void ListenAndServOp::RunSyncLoop(
const std::vector<int> &prefetch_block_id_list, const std::vector<int> &prefetch_block_id_list,
const int checkpoint_point_block_id) const { const int checkpoint_point_block_id) const {
size_t num_blocks = program->Size(); size_t num_blocks = program->Size();
auto optimize_blocks =
Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks);
PADDLE_ENFORCE_GE(num_blocks, 2, PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
std::vector<int> optimize_block_id_list; std::vector<int> optimize_blocks_idx;
for (int blkid = 1; blkid < num_blocks; ++blkid) { for (auto blk : optimize_blocks) {
if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), optimize_blocks_idx.push_back(blk->ID());
blkid) == prefetch_block_id_list.end() &&
blkid != checkpoint_point_block_id) {
optimize_block_id_list.push_back(blkid);
}
} }
auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx);
// Insert placeholder for block0 which holds current op itself. // Insert placeholder for block0 which holds current op itself.
optimize_prepared.insert( optimize_prepared.insert(
optimize_prepared.begin(), optimize_prepared.begin(),
...@@ -136,14 +134,14 @@ void ListenAndServOp::RunSyncLoop( ...@@ -136,14 +134,14 @@ void ListenAndServOp::RunSyncLoop(
// and this will still work. // and this will still work.
// The optimize blocks which have the same parent ID would run parallel // The optimize blocks which have the same parent ID would run parallel
// TODO(Yancey1989): need to use ParallelExecutor for future // TODO(Yancey1989): need to use ParallelExecutor for future
int32_t last_parent_blkid = program->Block(1).Parent(); int32_t last_parent_blkid = optimize_blocks[0]->Parent();
std::vector<size_t> parallel_blkids; std::vector<size_t> parallel_blkids;
parallel_blkids.push_back(1); parallel_blkids.push_back(optimize_blocks[0]->ID());
double ts = GetTimestamp(); double ts = GetTimestamp();
for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { for (size_t i = 1; i < optimize_blocks.size(); ++i) {
// skip the first optimize block because it is already in the // skip the first optimize block because it is already in the
// parallel_blkids. // parallel_blkids.
int blkid = optimize_block_id_list[i]; int blkid = optimize_blocks[i]->ID();
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared,
program, recv_scope); program, recv_scope);
...@@ -271,8 +269,11 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -271,8 +269,11 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
rpc_service_->RegisterRPC(distributed::kRequestCheckpoint, rpc_service_->RegisterRPC(distributed::kRequestCheckpoint,
request_checkpoint_handler_.get()); request_checkpoint_handler_.get());
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock); auto optimize_blocks =
auto *program = optimize_block->Program(); Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks);
PADDLE_ENFORCE(optimize_blocks.size() >= 1,
"optimize blocks should be 1 at least on the pserver side.");
auto *program = optimize_blocks[0]->Program();
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
std::shared_ptr<framework::ExecutorPrepareContext> ckpt_pre_context = nullptr; std::shared_ptr<framework::ExecutorPrepareContext> ckpt_pre_context = nullptr;
...@@ -358,8 +359,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -358,8 +359,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
"a map from grad name to it's optimize block id") "a map from grad name to it's optimize block id")
.SetDefault({}); .SetDefault({});
AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true);
AddAttr<framework::BlockDesc *>(kOptimizeBlock, AddAttr<std::vector<framework::BlockDesc *>>(
"BlockID to run on server side."); kOptimizeBlocks, "Optimize blocks to run on server side.")
.SetDefault({});
AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId, AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId,
"prefetch blocks to run on server side.") "prefetch blocks to run on server side.")
.SetDefault({}); .SetDefault({});
......
...@@ -30,7 +30,7 @@ limitations under the License. */ ...@@ -30,7 +30,7 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace operators { namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock"; constexpr char kOptimizeBlocks[] = "optimize_blocks";
constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id";
constexpr char kCheckpointBlockId[] = "checkpint_block_id"; constexpr char kCheckpointBlockId[] = "checkpint_block_id";
......
...@@ -129,7 +129,10 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) { ...@@ -129,7 +129,10 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) {
// sub program run in listen_and_serv_op, for simple test we use sum // sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program; f::ProgramDesc program;
const auto &root_block = program.Block(0); const auto &root_block = program.Block(0);
std::vector<framework::BlockDesc *> optimize_blocks;
auto *optimize_block = program.AppendBlock(root_block); auto *optimize_block = program.AppendBlock(root_block);
optimize_blocks.push_back(optimize_block);
auto *prefetch_block = program.AppendBlock(root_block); auto *prefetch_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensors, must be of same shape. // X for server side tensors, RX for received tensors, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block, AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block,
...@@ -139,7 +142,7 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) { ...@@ -139,7 +142,7 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) {
attrs.insert({"Fanin", 1}); attrs.insert({"Fanin", 1});
attrs.insert({"ParamList", std::vector<std::string>({"Out"})}); attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
attrs.insert({"GradList", std::vector<std::string>({"x1"})}); attrs.insert({"GradList", std::vector<std::string>({"x1"})});
attrs.insert({"OptimizeBlock", optimize_block}); attrs.insert({"optimize_blocks", optimize_blocks});
attrs.insert({"PrefetchBlock", prefetch_block}); attrs.insert({"PrefetchBlock", prefetch_block});
attrs.insert({"grad_to_block_id", std::vector<std::string>({""})}); attrs.insert({"grad_to_block_id", std::vector<std::string>({""})});
attrs.insert({"sync_mode", true}); attrs.insert({"sync_mode", true});
......
...@@ -27,8 +27,81 @@ using paddle::platform::MKLDNNMemDesc; ...@@ -27,8 +27,81 @@ using paddle::platform::MKLDNNMemDesc;
using mkldnn::memory; // Note: paddle has also "memory" namespace using mkldnn::memory; // Note: paddle has also "memory" namespace
using mkldnn::primitive; using mkldnn::primitive;
using mkldnn::softmax_forward; using mkldnn::softmax_forward;
using mkldnn::softmax_backward;
using mkldnn::prop_kind; using mkldnn::prop_kind;
using mkldnn::stream; using mkldnn::stream;
using platform::to_void_cast;
class SoftmaxMKLDNNHandler : public platform::MKLDNNHandler {
public:
SoftmaxMKLDNNHandler(
std::shared_ptr<mkldnn::softmax_forward::primitive_desc> softmax_pd,
const platform::MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine,
const std::string& base_key)
: platform::MKLDNNHandler(dev_ctx, engine, base_key),
softmax_pd_(softmax_pd) {}
SoftmaxMKLDNNHandler(
std::shared_ptr<mkldnn::softmax_forward::primitive_desc> softmax_pd,
std::shared_ptr<mkldnn::softmax_backward::primitive_desc> softmax_bwd_pd,
const platform::MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine,
const std::string& base_key)
: platform::MKLDNNHandler(dev_ctx, engine, base_key),
softmax_pd_(softmax_pd),
softmax_bwd_pd_(softmax_bwd_pd) {
// If we are in Grad operatgor then update a key with BWD suffix to
// distinguish from FWD memory primitives
key_ += "-BWD";
}
std::shared_ptr<mkldnn::softmax_forward> AcquireSoftmax(
std::shared_ptr<mkldnn::memory> dst_memory_p,
std::shared_ptr<mkldnn::memory> src_memory_p) {
/*Generate key*/
auto prim_key = key_ + "@softmax_p";
auto softmax_p = std::static_pointer_cast<mkldnn::softmax_forward>(
dev_ctx_.GetBlob(prim_key));
PADDLE_ENFORCE((softmax_p != nullptr) || (is_reusing_ == false),
"Fail to find softmax primitive in device context");
if (softmax_p == nullptr) {
softmax_p = std::make_shared<mkldnn::softmax_forward>(
*(softmax_pd_.get()),
*(static_cast<mkldnn::memory*>(src_memory_p.get())),
*(static_cast<mkldnn::memory*>(dst_memory_p.get())));
dev_ctx_.SetBlob(prim_key, softmax_p);
} else {
is_reusing_ = true;
}
return softmax_p;
}
std::shared_ptr<mkldnn::softmax_backward> AcquireSoftmaxBackward(
std::shared_ptr<mkldnn::memory> dst_memory_p,
std::shared_ptr<mkldnn::memory> diff_dst_memory_p,
std::shared_ptr<mkldnn::memory> diff_src_memory_p) {
auto prim_key = key_ + "@softmax_bwd_p";
auto softmax_bwd_p = std::static_pointer_cast<mkldnn::softmax_backward>(
dev_ctx_.GetBlob(prim_key));
PADDLE_ENFORCE((softmax_bwd_p != nullptr) || (is_reusing_ == false),
"Fail to find softmax backward primitive in device context");
if (softmax_bwd_p == nullptr) {
softmax_bwd_p = std::make_shared<mkldnn::softmax_backward>(
*softmax_bwd_pd_, *(dst_memory_p.get()), *(diff_dst_memory_p.get()),
*(diff_src_memory_p.get()));
dev_ctx_.SetBlob(prim_key, softmax_bwd_p);
} else {
is_reusing_ = true;
}
return softmax_bwd_p;
}
private:
std::shared_ptr<mkldnn::softmax_forward::primitive_desc> softmax_pd_;
std::shared_ptr<mkldnn::softmax_backward::primitive_desc> softmax_bwd_pd_;
};
template <typename T> template <typename T>
class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> { class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> {
...@@ -54,56 +127,27 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> { ...@@ -54,56 +127,27 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> {
// Same memory descriptor to be used for input and output // Same memory descriptor to be used for input and output
memory::dims softmax_tz = {src_tz[0], src_tz[1]}; memory::dims softmax_tz = {src_tz[0], src_tz[1]};
// Generate keys for storing/retriving primitives for this operator // Generate keys for storing/retriving primitives for this operator
// TODO(jczaja): Each MKLDNN operator may have diffrent hashing function const std::string key =
auto gethash = [](memory::dims& operand_dims) { platform::MKLDNNHandler::GetHash(softmax_tz, ctx.op().Output("Out"));
return std::string(std::to_string(operand_dims[0]) + "-" + const std::string key_softmax_pd = key + "@softmax_pd";
std::to_string(operand_dims[1]));
}; // Currently only NC data format is supported
const std::string key = gethash(softmax_tz); auto softmax_md = MKLDNNMemDesc(
const std::string key_softmax_p = key + "@softmax_p"; {softmax_tz}, platform::MKLDNNGetDataType<T>(), memory::format::nc);
const std::string key_softmax_src_mem_p = key + "@softmax_src_mem_p"; // Normalization is made after innermost dimension eg. C out of NC
const std::string key_softmax_dst_mem_p = key + "@softmax_dst_mem_p"; auto softmax_desc = softmax_forward::desc(prop_kind::forward_scoring,
softmax_md, 1 /*dim: C*/);
std::shared_ptr<void> softmax_p = dev_ctx.GetBlob(key_softmax_p); auto softmax_pd = std::make_shared<mkldnn::softmax_forward::primitive_desc>(
if (softmax_p == nullptr) { softmax_desc, mkldnn_engine);
// Currently only NC data format is supported dev_ctx.SetBlob(key_softmax_pd, softmax_pd);
auto softmax_md =
MKLDNNMemDesc({softmax_tz}, memory::f32, memory::format::nc); SoftmaxMKLDNNHandler handler(softmax_pd, dev_ctx, mkldnn_engine, key);
// Normalization is made after innermost dimension eg. C out of NC auto softmax_src_memory_p =
auto softmax_desc = softmax_forward::desc(prop_kind::forward_scoring, handler.AcquireSrcMemory(softmax_md, to_void_cast<T>(input_data));
softmax_md, 1 /*dim: C*/); auto softmax_dst_memory_p =
// create memory primitives handler.AcquireDstMemory(softmax_md, to_void_cast<T>(output_data));
auto softmax_src_memory_p = std::make_shared<memory>( auto softmax_p =
memory::primitive_desc{softmax_md, mkldnn_engine}, handler.AcquireSoftmax(softmax_dst_memory_p, softmax_src_memory_p);
static_cast<void*>(const_cast<T*>(input_data)));
dev_ctx.SetBlob(key_softmax_src_mem_p, softmax_src_memory_p);
auto softmax_dst_memory_p = std::make_shared<memory>(
memory::primitive_desc{softmax_md, mkldnn_engine},
static_cast<void*>(output_data));
dev_ctx.SetBlob(key_softmax_dst_mem_p, softmax_dst_memory_p);
auto softmax_forward_pd =
std::make_shared<softmax_forward::primitive_desc>(softmax_desc,
mkldnn_engine);
softmax_p = std::make_shared<softmax_forward>(
*(softmax_forward_pd.get()),
*(static_cast<memory*>(softmax_src_memory_p.get())),
*(static_cast<memory*>(softmax_dst_memory_p.get())));
dev_ctx.SetBlob(key_softmax_p, softmax_p);
} else {
// Primitives already exist
auto src_memory_p = std::static_pointer_cast<memory>(
dev_ctx.GetBlob(key_softmax_src_mem_p));
PADDLE_ENFORCE(src_memory_p != nullptr,
"Fail to find softmax src mem_p in device context");
auto dst_memory_p = std::static_pointer_cast<memory>(
dev_ctx.GetBlob(key_softmax_dst_mem_p));
PADDLE_ENFORCE(dst_memory_p != nullptr,
"Fail to find softmax dst mem_p in device context");
src_memory_p->set_data_handle(
reinterpret_cast<void*>(const_cast<T*>(input_data)));
dst_memory_p->set_data_handle(output_data);
}
std::vector<primitive> pipeline{ std::vector<primitive> pipeline{
*(static_cast<softmax_forward::primitive*>(softmax_p.get()))}; *(static_cast<softmax_forward::primitive*>(softmax_p.get()))};
...@@ -120,6 +164,77 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> { ...@@ -120,6 +164,77 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel<T> {
} }
}; };
template <typename T>
class SoftmaxMKLDNNGradKernel : public paddle::framework::OpKernel<T> {
public:
void Compute(const paddle::framework::ExecutionContext& ctx) const override {
PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()),
"It must use CPUPlace.");
auto& dev_ctx = ctx.template device_context<MKLDNNDeviceContext>();
auto mkldnn_engine = dev_ctx.GetEngine();
const Tensor* output = ctx.Input<Tensor>("Out");
const T* dst_data = output->data<T>();
auto* dout = ctx.template Input<Tensor>(framework::GradVarName("Out"));
const auto* diff_dst_ptr = dout->template data<T>();
auto* dx =
ctx.template Output<framework::Tensor>(framework::GradVarName("X"));
T* diff_src_ptr = dx->template mutable_data<T>(ctx.GetPlace());
std::vector<int> dst_tz = paddle::framework::vectorize2int(output->dims());
std::vector<int> src_tz(dst_tz);
PADDLE_ENFORCE(output->dims().size() == 2UL,
"The input of softmax op must be a 2D matrix.");
// MKL-DNN does support softmax over selected axis. Having 2D Tensor,
// we will make normalization after final eg. axis: 1
PADDLE_ENFORCE(((src_tz[0] == dst_tz[0]) && (src_tz[1] == dst_tz[1])),
"Softmax input and output dimensions should match");
// Same memory descriptor to be used for input and output
memory::dims softmax_tz = {src_tz[0], src_tz[1]};
// Currently only supports NC data format
// retrieve eltwise primitive desc from device context
const std::string key =
platform::MKLDNNHandler::GetHash(softmax_tz, ctx.op().Input("Out"));
const std::string key_softmax_pd = key + "@softmax_pd";
auto softmax_pd =
std::static_pointer_cast<mkldnn::softmax_forward::primitive_desc>(
dev_ctx.GetBlob(key_softmax_pd));
PADDLE_ENFORCE(softmax_pd != nullptr,
"Fail to find softmax_pd in device context");
// TODO(jczaja): Add layouts support when there is a need to do so
// Two dimensional softmax does support NC format
auto data_softmax_md = MKLDNNMemDesc(
{softmax_tz}, platform::MKLDNNGetDataType<T>(), memory::format::nc);
auto diff_softmax_md = MKLDNNMemDesc(
{softmax_tz}, platform::MKLDNNGetDataType<T>(), memory::format::nc);
// Normalization is made after innermost dimension eg. C out of NC
auto softmax_bwd_desc =
softmax_backward::desc(diff_softmax_md, data_softmax_md, 1 /* dim: C*/);
auto softmax_bwd_pd =
std::make_shared<mkldnn::softmax_backward::primitive_desc>(
softmax_bwd_desc, mkldnn_engine, *softmax_pd);
SoftmaxMKLDNNHandler handler(softmax_pd, softmax_bwd_pd, dev_ctx,
mkldnn_engine, key);
auto dst_memory_p =
handler.AcquireDstMemory(data_softmax_md, to_void_cast<T>(dst_data));
auto diff_dst_memory_p = handler.AcquireDiffDstMemory(
diff_softmax_md, to_void_cast<T>(diff_dst_ptr));
auto diff_src_memory_p = handler.AcquireDiffSrcMemory(
diff_softmax_md, to_void_cast<T>(diff_src_ptr));
// Get primitve from device context
auto softmax_bwd_p = handler.AcquireSoftmaxBackward(
dst_memory_p, diff_dst_memory_p, diff_src_memory_p);
std::vector<primitive> pipeline{*softmax_bwd_p};
stream(stream::kind::eager).submit(pipeline).wait();
}
};
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -127,3 +242,5 @@ namespace ops = paddle::operators; ...@@ -127,3 +242,5 @@ namespace ops = paddle::operators;
REGISTER_OP_KERNEL(softmax, MKLDNN, ::paddle::platform::CPUPlace, REGISTER_OP_KERNEL(softmax, MKLDNN, ::paddle::platform::CPUPlace,
ops::SoftmaxMKLDNNKernel<float>); ops::SoftmaxMKLDNNKernel<float>);
REGISTER_OP_KERNEL(softmax_grad, MKLDNN, ::paddle::platform::CPUPlace,
ops::SoftmaxMKLDNNGradKernel<float>);
...@@ -145,16 +145,30 @@ class SoftmaxOpGrad : public framework::OperatorWithKernel { ...@@ -145,16 +145,30 @@ class SoftmaxOpGrad : public framework::OperatorWithKernel {
const framework::ExecutionContext& ctx) const override { const framework::ExecutionContext& ctx) const override {
// choose cudnn kernel if the runtime supported. // choose cudnn kernel if the runtime supported.
framework::LibraryType library_{framework::LibraryType::kPlain}; framework::LibraryType library_{framework::LibraryType::kPlain};
std::string data_format = ctx.Attr<std::string>("data_format");
framework::DataLayout layout_ = framework::StringToDataLayout(data_format);
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (platform::CanCUDNNBeUsed(ctx)) { if (platform::CanCUDNNBeUsed(ctx)) {
library_ = framework::LibraryType::kCUDNN; library_ = framework::LibraryType::kCUDNN;
} }
#endif #endif
std::string data_format = ctx.Attr<std::string>("data_format"); #ifdef PADDLE_WITH_MKLDNN
return framework::OpKernelType( if (library_ == framework::LibraryType::kPlain &&
framework::ToDataType(ctx.Input<Tensor>("X")->type()), ctx.GetPlace(), platform::CanMKLDNNBeUsed(ctx)) {
framework::StringToDataLayout(data_format), library_); library_ = framework::LibraryType::kMKLDNN;
layout_ = framework::DataLayout::kMKLDNN;
}
#endif
auto input_data_type =
framework::ToDataType(ctx.Input<Tensor>("X")->type());
if (input_data_type == framework::proto::VarType::FP16) {
PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()),
"float16 can only be used on GPU place");
}
return framework::OpKernelType(input_data_type, ctx.GetPlace(), layout_,
library_);
} }
}; };
......
cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce) cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce)
list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc nccl.cc) list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc)
# There is no macOS version of NCCL.
if (NOT APPLE)
list(APPEND CUDA_SRCS nccl.cc)
endif()
if (TENSORRT_FOUND) if (TENSORRT_FOUND)
list(APPEND CUDA_SRCS tensorrt.cc) list(APPEND CUDA_SRCS tensorrt.cc)
endif() endif()
configure_file(cupti_lib_path.h.in ${CMAKE_CURRENT_BINARY_DIR}/cupti_lib_path.h) configure_file(cupti_lib_path.h.in ${CMAKE_CURRENT_BINARY_DIR}/cupti_lib_path.h)
if (CUPTI_FOUND) if (CUPTI_FOUND)
list(APPEND CUDA_SRCS cupti.cc) list(APPEND CUDA_SRCS cupti.cc)
......
...@@ -44,8 +44,10 @@ limitations under the License. */ ...@@ -44,8 +44,10 @@ limitations under the License. */
#include "paddle/fluid/platform/dynload/cublas.h" #include "paddle/fluid/platform/dynload/cublas.h"
#include "paddle/fluid/platform/dynload/cudnn.h" #include "paddle/fluid/platform/dynload/cudnn.h"
#include "paddle/fluid/platform/dynload/curand.h" #include "paddle/fluid/platform/dynload/curand.h"
#ifndef __APPLE__
#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/dynload/nccl.h"
#endif #endif // __APPLE__
#endif // PADDLE_WITH_CUDA
namespace paddle { namespace paddle {
namespace platform { namespace platform {
...@@ -174,6 +176,7 @@ inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error( ...@@ -174,6 +176,7 @@ inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error(
throw std::runtime_error(err + string::Sprintf(args...)); throw std::runtime_error(err + string::Sprintf(args...));
} }
#ifndef __APPLE__
template <typename... Args> template <typename... Args>
inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error( inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error(
ncclResult_t stat, const Args&... args) { ncclResult_t stat, const Args&... args) {
...@@ -184,7 +187,7 @@ inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error( ...@@ -184,7 +187,7 @@ inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error(
string::Sprintf(args...)); string::Sprintf(args...));
} }
} }
#endif // __APPLE__
#endif // PADDLE_WITH_CUDA #endif // PADDLE_WITH_CUDA
template <typename T> template <typename T>
......
...@@ -105,5 +105,137 @@ inline mkldnn::memory::format GetMKLDNNFormat( ...@@ -105,5 +105,137 @@ inline mkldnn::memory::format GetMKLDNNFormat(
memory.dst_primitive_desc().desc().data.format); memory.dst_primitive_desc().desc().data.format);
} }
class MKLDNNHandler {
public:
MKLDNNHandler(const MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine,
const std::string& base_key)
: dev_ctx_(dev_ctx),
engine_(engine),
key_(base_key),
is_reusing_(false) {}
std::shared_ptr<mkldnn::memory> AcquireSrcMemory(
const mkldnn::memory::desc& md, void* ptr) {
return this->AcquireMemory(md, ptr, "@user_src_mem_p");
}
std::shared_ptr<mkldnn::memory> AcquireWeightsMemory(
const mkldnn::memory::desc& md, void* ptr) {
return this->AcquireMemory(md, ptr, "@user_weights_mem_p");
}
std::shared_ptr<mkldnn::memory> AcquireDstMemory(
const mkldnn::memory::desc& md, void* ptr) {
return this->AcquireMemory(md, ptr, "@user_dst_mem_p");
}
std::shared_ptr<mkldnn::memory> AcquireDiffDstMemory(
const mkldnn::memory::desc& md, void* ptr) {
return this->AcquireMemory(md, ptr, "@user_diff_dst_mem_p");
}
std::shared_ptr<mkldnn::memory> AcquireDiffSrcMemory(
const mkldnn::memory::desc& md, void* ptr) {
return this->AcquireMemory(md, ptr, "@user_diff_src_mem_p");
}
std::shared_ptr<mkldnn::memory> AcquireMemoryFromPrimitive(
mkldnn::memory::primitive_desc mdp, void* ptr,
const std::string& suffix) {
auto local_key = key_ + suffix;
auto mem_p =
std::static_pointer_cast<mkldnn::memory>(dev_ctx_.GetBlob(local_key));
PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false),
"Fail to find mem primitive in device context");
if (mem_p == nullptr) {
mem_p = std::make_shared<mkldnn::memory>(mdp, ptr);
dev_ctx_.SetBlob(local_key, mem_p);
} else {
mem_p->set_data_handle(ptr);
// Mark that reusing happenned. All primitives from operator instance
// should be reused or none of them. So we check consistency
is_reusing_ = true;
}
return mem_p;
}
std::shared_ptr<mkldnn::memory> AcquireMemory(const mkldnn::memory::desc& md,
void* ptr,
const std::string& suffix) {
/*Generate key*/
auto local_key = key_ + suffix;
auto mem_p =
std::static_pointer_cast<mkldnn::memory>(dev_ctx_.GetBlob(local_key));
PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false),
"Fail to find mem primitive in device context");
if (mem_p == nullptr) {
mem_p = std::make_shared<mkldnn::memory>(
mkldnn::memory::primitive_desc{md, engine_}, ptr);
dev_ctx_.SetBlob(local_key, mem_p);
} else {
mem_p->set_data_handle(ptr);
// Mark that reusing happenned. All primitives from operator instance
// should be reused or none of them. So we check consistency
is_reusing_ = true;
}
return mem_p;
}
std::shared_ptr<mkldnn::memory> AcquireMemory(
mkldnn::memory::primitive_desc& mpd,
mkldnn::memory::primitive_desc& user_mpd,
const std::shared_ptr<mkldnn::memory> user_memory_p,
const std::string& suffix, std::vector<mkldnn::primitive>& pipeline) {
// create reorder primitive if the input format is not the preferred one
auto local_key = key_ + suffix;
auto key_reorder_p = key_ + suffix + "reorder_p";
auto target_memory_p =
std::static_pointer_cast<mkldnn::memory>(dev_ctx_.GetBlob(local_key));
PADDLE_ENFORCE((target_memory_p != nullptr) || (is_reusing_ == false),
"Fail to find mem primitive in device context");
if (target_memory_p == nullptr) {
target_memory_p = user_memory_p;
std::shared_ptr<mkldnn::primitive> reorder_p;
if (mpd != user_mpd) {
target_memory_p = std::make_shared<mkldnn::memory>(mpd);
auto reorder_p =
std::make_shared<mkldnn::reorder>(*user_memory_p, *target_memory_p);
dev_ctx_.SetBlob(key_reorder_p, reorder_p);
pipeline.push_back(*reorder_p);
}
dev_ctx_.SetBlob(local_key, target_memory_p);
} else {
// Make reorder if needed
auto reorder_p = std::static_pointer_cast<mkldnn::reorder>(
dev_ctx_.GetBlob(key_reorder_p));
if (reorder_p != nullptr) {
pipeline.push_back(*reorder_p);
}
is_reusing_ = true;
}
return target_memory_p;
}
static std::string GetHash(mkldnn::memory::dims& operand_dims,
const std::string& suffix) {
auto dims2str = [](const mkldnn::memory::dims& operand_dims) {
std::string dstr = "";
for (size_t i = 0; i < operand_dims.size(); ++i) {
dstr += std::to_string(operand_dims[i]) + "-";
}
return dstr;
};
return dims2str(operand_dims) + suffix;
};
protected:
const MKLDNNDeviceContext& dev_ctx_;
mkldnn::engine engine_;
std::string key_;
bool is_reusing_;
};
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -268,7 +268,8 @@ void BindOpDesc(pybind11::module *m) { ...@@ -268,7 +268,8 @@ void BindOpDesc(pybind11::module *m) {
.value("STRINGS", pd::proto::AttrType::STRINGS) .value("STRINGS", pd::proto::AttrType::STRINGS)
.value("BOOL", pd::proto::AttrType::BOOLEAN) .value("BOOL", pd::proto::AttrType::BOOLEAN)
.value("BOOLS", pd::proto::AttrType::BOOLEANS) .value("BOOLS", pd::proto::AttrType::BOOLEANS)
.value("BLOCK", pd::proto::AttrType::BLOCK); .value("BLOCK", pd::proto::AttrType::BLOCK)
.value("BLOCKS", pd::proto::AttrType::BLOCKS);
pybind11::class_<pd::OpDesc> op_desc(*m, "OpDesc", ""); pybind11::class_<pd::OpDesc> op_desc(*m, "OpDesc", "");
op_desc op_desc
...@@ -293,6 +294,7 @@ void BindOpDesc(pybind11::module *m) { ...@@ -293,6 +294,7 @@ void BindOpDesc(pybind11::module *m) {
.def("set_attr", &pd::OpDesc::SetAttr) .def("set_attr", &pd::OpDesc::SetAttr)
.def("attr", &pd::OpDesc::GetAttr) .def("attr", &pd::OpDesc::GetAttr)
.def("set_block_attr", &pd::OpDesc::SetBlockAttr) .def("set_block_attr", &pd::OpDesc::SetBlockAttr)
.def("set_blocks_attr", &pd::OpDesc::SetBlocksAttr)
.def("set_serialized_attr", .def("set_serialized_attr",
[](pd::OpDesc &self, const std::string &name, [](pd::OpDesc &self, const std::string &name,
const pybind11::bytes &seriralized) { const pybind11::bytes &seriralized) {
......
...@@ -167,9 +167,6 @@ PYBIND11_PLUGIN(core) { ...@@ -167,9 +167,6 @@ PYBIND11_PLUGIN(core) {
.def("set_lod", .def("set_lod",
[](LoDTensor &self, const std::vector<std::vector<size_t>> &lod) { [](LoDTensor &self, const std::vector<std::vector<size_t>> &lod) {
// the input lod is offset-based level-of-detail info // the input lod is offset-based level-of-detail info
LOG(WARNING)
<< "set_lod is deprecated and will be removed by 9.2018, "
"please switch to set_recursive_sequence_lengths.";
LoD new_lod; LoD new_lod;
new_lod.reserve(lod.size()); new_lod.reserve(lod.size());
std::copy(lod.begin(), lod.end(), std::back_inserter(new_lod)); std::copy(lod.begin(), lod.end(), std::back_inserter(new_lod));
...@@ -196,8 +193,6 @@ PYBIND11_PLUGIN(core) { ...@@ -196,8 +193,6 @@ PYBIND11_PLUGIN(core) {
.def("lod", .def("lod",
[](LoDTensor &self) -> std::vector<std::vector<size_t>> { [](LoDTensor &self) -> std::vector<std::vector<size_t>> {
// output the offset-based lod info // output the offset-based lod info
LOG(WARNING) << "lod is deprecated and will be removed by 9.2018, "
"please switch to recursive_sequence_lengths.";
LoD lod = self.lod(); LoD lod = self.lod();
std::vector<std::vector<size_t>> new_lod; std::vector<std::vector<size_t>> new_lod;
new_lod.reserve(lod.size()); new_lod.reserve(lod.size());
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
function print_usage() { function print_usage() {
echo -e "\n${RED}Usage${NONE}: echo -e "\n${RED}Usage${NONE}:
${BOLD}${SCRIPT_NAME}${NONE} [OPTION]" ${BOLD}${SCRIPT_NAME}${NONE} [OPTION]"
echo -e "\n${RED}Options${NONE}: echo -e "\n${RED}Options${NONE}:
${BLUE}build${NONE}: run build for x86 platform ${BLUE}build${NONE}: run build for x86 platform
${BLUE}build_android${NONE}: run build for android platform ${BLUE}build_android${NONE}: run build for android platform
...@@ -133,7 +133,7 @@ EOF ...@@ -133,7 +133,7 @@ EOF
-DWITH_FLUID_ONLY=${WITH_FLUID_ONLY:-OFF} \ -DWITH_FLUID_ONLY=${WITH_FLUID_ONLY:-OFF} \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DWITH_CONTRIB=${WITH_CONTRIB:-ON} \ -DWITH_CONTRIB=${WITH_CONTRIB:-ON} \
-DWITH_ANAKIN=ON -DWITH_ANAKIN=${WITH_ANAKIN:-ON}
} }
function abort(){ function abort(){
...@@ -198,7 +198,7 @@ function build_android() { ...@@ -198,7 +198,7 @@ function build_android() {
fi fi
ANDROID_STANDALONE_TOOLCHAIN=$ANDROID_TOOLCHAINS_DIR/$ANDROID_ARCH-android-$ANDROID_API ANDROID_STANDALONE_TOOLCHAIN=$ANDROID_TOOLCHAINS_DIR/$ANDROID_ARCH-android-$ANDROID_API
cat <<EOF cat <<EOF
============================================ ============================================
Generating the standalone toolchain ... Generating the standalone toolchain ...
...@@ -212,13 +212,13 @@ EOF ...@@ -212,13 +212,13 @@ EOF
--arch=$ANDROID_ARCH \ --arch=$ANDROID_ARCH \
--platform=android-$ANDROID_API \ --platform=android-$ANDROID_API \
--install-dir=$ANDROID_STANDALONE_TOOLCHAIN --install-dir=$ANDROID_STANDALONE_TOOLCHAIN
BUILD_ROOT=${PADDLE_ROOT}/build_android BUILD_ROOT=${PADDLE_ROOT}/build_android
DEST_ROOT=${PADDLE_ROOT}/install_android DEST_ROOT=${PADDLE_ROOT}/install_android
mkdir -p $BUILD_ROOT mkdir -p $BUILD_ROOT
cd $BUILD_ROOT cd $BUILD_ROOT
if [ $ANDROID_ABI == "armeabi-v7a" ]; then if [ $ANDROID_ABI == "armeabi-v7a" ]; then
cmake -DCMAKE_SYSTEM_NAME=Android \ cmake -DCMAKE_SYSTEM_NAME=Android \
-DANDROID_STANDALONE_TOOLCHAIN=$ANDROID_STANDALONE_TOOLCHAIN \ -DANDROID_STANDALONE_TOOLCHAIN=$ANDROID_STANDALONE_TOOLCHAIN \
...@@ -286,7 +286,7 @@ function build_ios() { ...@@ -286,7 +286,7 @@ function build_ios() {
-DWITH_TESTING=OFF \ -DWITH_TESTING=OFF \
-DWITH_SWIG_PY=OFF \ -DWITH_SWIG_PY=OFF \
-DCMAKE_BUILD_TYPE=Release -DCMAKE_BUILD_TYPE=Release
make -j 2 make -j 2
} }
...@@ -331,14 +331,14 @@ EOF ...@@ -331,14 +331,14 @@ EOF
function bind_test() { function bind_test() {
# the number of process to run tests # the number of process to run tests
NUM_PROC=6 NUM_PROC=6
# calculate and set the memory usage for each process # calculate and set the memory usage for each process
MEM_USAGE=$(printf "%.2f" `echo "scale=5; 1.0 / $NUM_PROC" | bc`) MEM_USAGE=$(printf "%.2f" `echo "scale=5; 1.0 / $NUM_PROC" | bc`)
export FLAGS_fraction_of_gpu_memory_to_use=$MEM_USAGE export FLAGS_fraction_of_gpu_memory_to_use=$MEM_USAGE
# get the CUDA device count # get the CUDA device count
CUDA_DEVICE_COUNT=$(nvidia-smi -L | wc -l) CUDA_DEVICE_COUNT=$(nvidia-smi -L | wc -l)
for (( i = 0; i < $NUM_PROC; i++ )); do for (( i = 0; i < $NUM_PROC; i++ )); do
cuda_list=() cuda_list=()
for (( j = 0; j < $CUDA_DEVICE_COUNT; j++ )); do for (( j = 0; j < $CUDA_DEVICE_COUNT; j++ )); do
......
...@@ -41,7 +41,12 @@ def _clone_var_(block, var): ...@@ -41,7 +41,12 @@ def _clone_var_(block, var):
class Evaluator(object): class Evaluator(object):
""" """
Base Class for all evaluators Warning: better to use the fluid.metrics.* things, more
flexible support via pure Python and Operator, and decoupled
with executor. Short doc are intended to urge new user
start from Metrics.
Base Class for all evaluators.
Args: Args:
name(str): The name of evaluator. such as, "accuracy". Used for generate name(str): The name of evaluator. such as, "accuracy". Used for generate
...@@ -69,6 +74,10 @@ class Evaluator(object): ...@@ -69,6 +74,10 @@ class Evaluator(object):
def reset(self, executor, reset_program=None): def reset(self, executor, reset_program=None):
""" """
reset metric states at the begin of each pass/user specified batch reset metric states at the begin of each pass/user specified batch
Args:
executor(Executor|ParallelExecutor): a executor for executing the reset_program
reset_program(Program): a single Program for reset process
""" """
if reset_program is None: if reset_program is None:
reset_program = Program() reset_program = Program()
...@@ -85,15 +94,16 @@ class Evaluator(object): ...@@ -85,15 +94,16 @@ class Evaluator(object):
def eval(self, executor, eval_program=None): def eval(self, executor, eval_program=None):
""" """
Evaluate the statistics merged by multiple mini-batches. Evaluate the statistics merged by multiple mini-batches.
Args:
executor(Executor|ParallelExecutor): a executor for executing the eval_program
eval_program(Program): a single Program for eval process
""" """
raise NotImplementedError() raise NotImplementedError()
def create_state(self, suffix, dtype, shape): def _create_state(self, suffix, dtype, shape):
""" """
Create state variable. Create state variable.
NOTE: It is not a public API.
Args: Args:
suffix(str): the state suffix. suffix(str): the state suffix.
dtype(str|core.VarDesc.VarType): the state data type dtype(str|core.VarDesc.VarType): the state data type
...@@ -113,9 +123,35 @@ class Evaluator(object): ...@@ -113,9 +123,35 @@ class Evaluator(object):
class ChunkEvaluator(Evaluator): class ChunkEvaluator(Evaluator):
""" """
Warning: This would be deprecated in the future. Please use fluid.metrics.ChunkEvaluator
instead.
Accumulate counter numbers output by chunk_eval from mini-batches and Accumulate counter numbers output by chunk_eval from mini-batches and
compute the precision recall and F1-score using the accumulated counter compute the precision recall and F1-score using the accumulated counter
numbers. numbers.
For some basics of chunking, please refer to
'Chunking with Support Vector Machines <https://aclanthology.info/pdf/N/N01/N01-1025.pdf>'.
Args:
input (Variable): prediction output of the network.
label (Variable): label of the test data set.
chunk_scheme (str): can be IOB/IOE/IOBES and IO. See the chunk_eval op for details.
num_chunk_types (int): the number of chunk type.
excluded_chunk_types (list): A list including chunk type ids, indicating chunk types that are not counted.
Returns:
tuple: tuple containing: precision, recall, f1_score
Examples:
.. code-block:: python
exe = fluid.executor(place)
evaluator = fluid.Evaluator.ChunkEvaluator(input, label)
for epoch in PASS_NUM:
evaluator.reset(exe)
for data in batches:
loss = exe.run(fetch_list=[cost])
distance, instance_error = distance_evaluator.eval(exe)
""" """
def __init__( def __init__(
...@@ -130,11 +166,11 @@ class ChunkEvaluator(Evaluator): ...@@ -130,11 +166,11 @@ class ChunkEvaluator(Evaluator):
if main_program.current_block().idx != 0: if main_program.current_block().idx != 0:
raise ValueError("You can only invoke Evaluator in root block") raise ValueError("You can only invoke Evaluator in root block")
self.num_infer_chunks = self.create_state( self.num_infer_chunks = self._create_state(
dtype='int64', shape=[1], suffix='num_infer_chunks') dtype='int64', shape=[1], suffix='num_infer_chunks')
self.num_label_chunks = self.create_state( self.num_label_chunks = self._create_state(
dtype='int64', shape=[1], suffix='num_label_chunks') dtype='int64', shape=[1], suffix='num_label_chunks')
self.num_correct_chunks = self.create_state( self.num_correct_chunks = self._create_state(
dtype='int64', shape=[1], suffix='num_correct_chunks') dtype='int64', shape=[1], suffix='num_correct_chunks')
precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval( precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval(
input=input, input=input,
...@@ -178,6 +214,8 @@ class ChunkEvaluator(Evaluator): ...@@ -178,6 +214,8 @@ class ChunkEvaluator(Evaluator):
class EditDistance(Evaluator): class EditDistance(Evaluator):
""" """
Warning: This would be deprecated in the future. Please use fluid.metrics.EditDistance
instead.
Accumulate edit distance sum and sequence number from mini-batches and Accumulate edit distance sum and sequence number from mini-batches and
compute the average edit_distance and instance error of all batches. compute the average edit_distance and instance error of all batches.
...@@ -188,15 +226,16 @@ class EditDistance(Evaluator): ...@@ -188,15 +226,16 @@ class EditDistance(Evaluator):
ignored_tokens(list of int): Tokens that should be removed before ignored_tokens(list of int): Tokens that should be removed before
calculating edit distance. calculating edit distance.
Example: Examples:
.. code-block:: python
exe = fluid.executor(place) exe = fluid.executor(place)
distance_evaluator = fluid.Evaluator.EditDistance(input, label) distance_evaluator = fluid.Evaluator.EditDistance(input, label)
for epoch in PASS_NUM: for epoch in PASS_NUM:
distance_evaluator.reset(exe) distance_evaluator.reset(exe)
for data in batches: for data in batches:
loss = exe.run(fetch_list=[cost]) loss = exe.run(fetch_list=[cost])
distance, instance_error = distance_evaluator.eval(exe) distance, instance_error = distance_evaluator.eval(exe)
In the above example: In the above example:
'distance' is the average of the edit distance in a pass. 'distance' is the average of the edit distance in a pass.
...@@ -210,11 +249,11 @@ class EditDistance(Evaluator): ...@@ -210,11 +249,11 @@ class EditDistance(Evaluator):
if main_program.current_block().idx != 0: if main_program.current_block().idx != 0:
raise ValueError("You can only invoke Evaluator in root block") raise ValueError("You can only invoke Evaluator in root block")
self.total_distance = self.create_state( self.total_distance = self._create_state(
dtype='float32', shape=[1], suffix='total_distance') dtype='float32', shape=[1], suffix='total_distance')
self.seq_num = self.create_state( self.seq_num = self._create_state(
dtype='int64', shape=[1], suffix='seq_num') dtype='int64', shape=[1], suffix='seq_num')
self.instance_error = self.create_state( self.instance_error = self._create_state(
dtype='int64', shape=[1], suffix='instance_error') dtype='int64', shape=[1], suffix='instance_error')
distances, seq_num = layers.edit_distance( distances, seq_num = layers.edit_distance(
input=input, label=label, ignored_tokens=ignored_tokens) input=input, label=label, ignored_tokens=ignored_tokens)
...@@ -256,9 +295,10 @@ class EditDistance(Evaluator): ...@@ -256,9 +295,10 @@ class EditDistance(Evaluator):
class DetectionMAP(Evaluator): class DetectionMAP(Evaluator):
""" """
Warning: This would be deprecated in the future. Please use fluid.metrics.DetectionMAP
instead.
Calculate the detection mean average precision (mAP). Calculate the detection mean average precision (mAP).
TODO (Dang Qingqing): update the following doc.
The general steps are as follows: The general steps are as follows:
1. calculate the true positive and false positive according to the input 1. calculate the true positive and false positive according to the input
of detection and labels. of detection and labels.
...@@ -293,17 +333,18 @@ class DetectionMAP(Evaluator): ...@@ -293,17 +333,18 @@ class DetectionMAP(Evaluator):
- 11point: the 11-point interpolated average precision. - 11point: the 11-point interpolated average precision.
- integral: the natural integral of the precision-recall curve. - integral: the natural integral of the precision-recall curve.
Example: Examples:
.. code-block:: python
exe = fluid.executor(place) exe = fluid.executor(place)
map_evaluator = fluid.Evaluator.DetectionMAP(input, map_evaluator = fluid.Evaluator.DetectionMAP(input,
gt_label, gt_box, gt_difficult) gt_label, gt_box, gt_difficult)
cur_map, accum_map = map_evaluator.get_map_var() cur_map, accum_map = map_evaluator.get_map_var()
fetch = [cost, cur_map, accum_map] fetch = [cost, cur_map, accum_map]
for epoch in PASS_NUM: for epoch in PASS_NUM:
map_evaluator.reset(exe) map_evaluator.reset(exe)
for data in batches: for data in batches:
loss, cur_map_v, accum_map_v = exe.run(fetch_list=fetch) loss, cur_map_v, accum_map_v = exe.run(fetch_list=fetch)
In the above example: In the above example:
...@@ -340,9 +381,10 @@ class DetectionMAP(Evaluator): ...@@ -340,9 +381,10 @@ class DetectionMAP(Evaluator):
evaluate_difficult=evaluate_difficult, evaluate_difficult=evaluate_difficult,
ap_version=ap_version) ap_version=ap_version)
self.create_state(dtype='int32', shape=None, suffix='accum_pos_count') self._create_state(dtype='int32', shape=None, suffix='accum_pos_count')
self.create_state(dtype='float32', shape=None, suffix='accum_true_pos') self._create_state(dtype='float32', shape=None, suffix='accum_true_pos')
self.create_state(dtype='float32', shape=None, suffix='accum_false_pos') self._create_state(
dtype='float32', shape=None, suffix='accum_false_pos')
self.has_state = None self.has_state = None
var = self.helper.create_variable( var = self.helper.create_variable(
......
...@@ -18,7 +18,7 @@ from framework import Program, default_main_program, Variable ...@@ -18,7 +18,7 @@ from framework import Program, default_main_program, Variable
from . import core from . import core
__all__ = [ __all__ = [
'Executor', 'global_scope', 'scope_guard', 'switch_scope', 'fetch_var' 'Executor', 'global_scope', 'scope_guard', '_switch_scope', 'fetch_var'
] ]
g_scope = core.Scope() g_scope = core.Scope()
...@@ -35,7 +35,7 @@ def global_scope(): ...@@ -35,7 +35,7 @@ def global_scope():
return g_scope return g_scope
def switch_scope(scope): def _switch_scope(scope):
global g_scope global g_scope
ex = g_scope ex = g_scope
g_scope = scope g_scope = scope
...@@ -57,12 +57,27 @@ def scope_guard(scope): ...@@ -57,12 +57,27 @@ def scope_guard(scope):
Args: Args:
scope: The new global/default scope. scope: The new global/default scope.
""" """
ex = switch_scope(scope) ex = _switch_scope(scope)
yield yield
switch_scope(ex) _switch_scope(ex)
def as_numpy(tensor): def as_numpy(tensor):
"""
Convert a Tensor to a numpy.ndarray, its only support Tensor without LoD information.
For higher dimensional sequence data, please use LoDTensor directly.
Examples:
>>> import paddle.fluid as fluid
>>> outs = executor.run(...)
>>> np_outs = map(lambda x: as_numpy(x), outs)
>>> ...
Args:
tensor(Variable): a instance of Tensor
Returns:
numpy.ndarray
"""
if isinstance(tensor, list): if isinstance(tensor, list):
return [as_numpy(t) for t in tensor] return [as_numpy(t) for t in tensor]
assert isinstance(tensor, core.LoDTensor) assert isinstance(tensor, core.LoDTensor)
...@@ -186,7 +201,7 @@ def fetch_var(name, scope=None, return_numpy=True): ...@@ -186,7 +201,7 @@ def fetch_var(name, scope=None, return_numpy=True):
return tensor return tensor
def get_program_cache_key(feed, fetch_list): def _get_program_cache_key(feed, fetch_list):
feed_var_names = feed.keys() feed_var_names = feed.keys()
def to_name_str(var): def to_name_str(var):
...@@ -205,6 +220,25 @@ def get_program_cache_key(feed, fetch_list): ...@@ -205,6 +220,25 @@ def get_program_cache_key(feed, fetch_list):
class Executor(object): class Executor(object):
"""
An Executor in Python, only support the single-GPU running. For multi-cards, please refer to
ParallelExecutor.
Python executor takes a program, add feed operators and fetch operators to this program according
to feed map and fetch_list. Feed map provides input data for the program. fetch_list provides
the variables(or names) that user want to get after program run. Note: the executor will run all
operators in the program but not only the operators dependent by the fetch_list.
It store the global variables into the global scope, and create a local scope for the temporary
variables. The local scope contents will be discarded after every minibatch forward/backward finished.
But the global scope variables will be persistent through different runs.
All of ops in program will be running in sequence.
Args:
place(core.CPUPlace|core.CUDAPlace(n)): indicate the executor run on which device
Note: For debugging complicated network in parallel-GPUs, you can test it on the executor.
They has the exactly same arguments, and expected the same results.
"""
def __init__(self, place): def __init__(self, place):
self.place = place self.place = place
p = core.Place() p = core.Place()
...@@ -213,6 +247,23 @@ class Executor(object): ...@@ -213,6 +247,23 @@ class Executor(object):
self.program_caches = dict() self.program_caches = dict()
def as_lodtensor(self, data): def as_lodtensor(self, data):
"""
Convert numpy.ndarray to Tensor, its only support Tensor without LoD information.
For higher dimensional sequence data, please use LoDTensor directly.
Examples:
>>> import paddle.fluid as fluid
>>> exe = fluid.executor(fluid.CPUPlace())
>>> data = np.array(size=(100, 200, 300))
>>> np_outs = map(lambda x: exe.as_lodtensor(x), data)
>>> ...
Args:
data(numpy.ndarray): a instance of array
Returns:
LoDTensor
"""
if isinstance(data, list): if isinstance(data, list):
raise RuntimeError("Some of your feed data hold LoD information. \ raise RuntimeError("Some of your feed data hold LoD information. \
They can not be completely cast from a list of Python \ They can not be completely cast from a list of Python \
...@@ -304,23 +355,47 @@ class Executor(object): ...@@ -304,23 +355,47 @@ class Executor(object):
scope=None, scope=None,
return_numpy=True, return_numpy=True,
use_program_cache=False): use_program_cache=False):
""" Run program by this Executor. Feed data by feed map, fetch result by fetch_list. """
Run program by this Executor. Feed data by feed map, fetch result by fetch_list.
Python executor takes a program, add feed operators and fetch operators to this program according Python executor takes a program, add feed operators and fetch operators to this program according
to feed map and fetch_list. Feed map provides input data for the program. fetch_list provides to feed map and fetch_list. Feed map provides input data for the program. fetch_list provides
the variables(or names) that user want to get after program run. Note: the executor will run all the variables(or names) that user want to get after program run.
Note: the executor will run all
operators in the program but not only the operators dependent by the fetch_list operators in the program but not only the operators dependent by the fetch_list
:param program: the program that need to run, if not provied, then default_main_program will be used. Args:
:param feed: feed variable map, e.g. {"image": ImageData, "label": LableData} program(Program): the program that need to run, if not provied, then default_main_program will be used.
:param fetch_list: a list of variable or variable names that user want to get, run will return them according feed(dict): feed variable map, e.g. {"image": ImageData, "label": LableData}
to this list. fetch_list(list): a list of variable or variable names that user want to get, run will return them according to this list.
:param feed_var_name: the name for the input variable of feed Operator. feed_var_name(str): the name for the input variable of feed Operator.
:param fetch_var_name: the name for the output variable of feed Operator. fetch_var_name(str): the name for the output variable of fetch Operator.
:param scope: the scope used to run this program, you can switch it to different scope. default is global_scope scope(Scope): the scope used to run this program, you can switch it to different scope. default is global_scope
:param return_numpy: if convert the fetched tensor to numpy return_numpy(bool): if convert the fetched tensor to numpy
:param use_program_cache: set use_program_cache to true if program not changed compare to the last step. use_program_cache(bool): set use_program_cache to true if program not changed compare to the last step.
:return: result according to fetch_list.
Returns:
list(numpy.array): fetch result according to fetch_list.
Examples:
>>> data = layers.data(name='X', shape=[1], dtype='float32')
>>> hidden = layers.fc(input=data, size=10)
>>> layers.assign(hidden, out)
>>> loss = layers.mean(out)
>>> adam = fluid.optimizer.Adam()
>>> adam.minimize(loss)
>>> cpu = core.CPUPlace()
>>> exe = Executor(cpu)
>>> exe.run(default_startup_program())
>>> x = numpy.random.random(size=(10, 1)).astype('float32')
>>> outs = exe.run(
>>> feed={'X': x},
>>> fetch_list=[loss.name])
""" """
if feed is None: if feed is None:
feed = {} feed = {}
...@@ -341,7 +416,7 @@ class Executor(object): ...@@ -341,7 +416,7 @@ class Executor(object):
if scope is None: if scope is None:
scope = global_scope() scope = global_scope()
cache_key = get_program_cache_key(feed, fetch_list) cache_key = _get_program_cache_key(feed, fetch_list)
if use_program_cache: if use_program_cache:
cached_program = self._get_program_cache(cache_key) cached_program = self._get_program_cache(cache_key)
if cached_program is None: if cached_program is None:
......
...@@ -558,15 +558,20 @@ class Operator(object): ...@@ -558,15 +558,20 @@ class Operator(object):
if (attr_name not in self.attrs) or ( if (attr_name not in self.attrs) or (
self.attrs[attr_name] is None): self.attrs[attr_name] is None):
continue continue
if isinstance(self.attrs[attr_name], Block): attr_val = self.attrs[attr_name]
if isinstance(attr_val, Block):
self.desc.set_block_attr(attr_name, self.desc.set_block_attr(attr_name,
self.attrs[attr_name].desc) self.attrs[attr_name].desc)
elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ elif isinstance(attr_val, list) and attr_val and \
isinstance(self.attrs[attr_name], core.ProgramDesc): all(isinstance(v, Block) for v in attr_val):
self.desc.set_blocks_attr(attr_name,
[v.desc for v in attr_val])
elif isinstance(attr_val, core.BlockDesc) or \
isinstance(attr_val, core.ProgramDesc):
self.desc.set_serialized_attr( self.desc.set_serialized_attr(
attr_name, self.attrs[attr_name].serialize_to_string()) attr_name, attr_val.serialize_to_string())
else: else:
self.desc.set_attr(attr_name, self.attrs[attr_name]) self.desc.set_attr(attr_name, attr_val)
self.desc.check_attrs() self.desc.check_attrs()
if self.has_kernel(type): if self.has_kernel(type):
self.desc.infer_var_type(self.block.desc) self.desc.infer_var_type(self.block.desc)
...@@ -715,6 +720,9 @@ class Operator(object): ...@@ -715,6 +720,9 @@ class Operator(object):
self.attrs[name] = val self.attrs[name] = val
if isinstance(val, Block): if isinstance(val, Block):
self.desc.set_block_attr(name, val.desc) self.desc.set_block_attr(name, val.desc)
elif isinstance(val, list) and val and all(
isinstance(v, Block) for v in val):
self.desc.set_blocks_attr(name, [v.desc for v in val])
elif isinstance(val, core.BlockDesc) or \ elif isinstance(val, core.BlockDesc) or \
isinstance(val, core.ProgramDesc): isinstance(val, core.ProgramDesc):
self.desc.set_serialized_attr(name, val.serialize_to_string()) self.desc.set_serialized_attr(name, val.serialize_to_string())
...@@ -1390,7 +1398,11 @@ class Program(object): ...@@ -1390,7 +1398,11 @@ class Program(object):
* Set for_test to True when we want to clone the program for testing. * Set for_test to True when we want to clone the program for testing.
Notes: This API DOES NOT prune any operator. Use Notes: This API DOES NOT prune any operator. Use
:code:`clone(for_test=True)` before backward and optimization please. :code:`clone(for_test=True)` before backward and optimization please. e.g.
>>> test_program = fluid.default_main_program().clone(for_test=True)
>>> optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
>>> optimizer.minimize()
Args: Args:
for_test(bool): True if change the :code:`is_test` attribute of for_test(bool): True if change the :code:`is_test` attribute of
......
...@@ -28,8 +28,8 @@ import math_op_patch ...@@ -28,8 +28,8 @@ import math_op_patch
from math_op_patch import * from math_op_patch import *
import detection import detection
from detection import * from detection import *
import metric import metric_op
from metric import * from metric_op import *
from learning_rate_scheduler import * from learning_rate_scheduler import *
__all__ = [] __all__ = []
...@@ -41,5 +41,5 @@ __all__ += control_flow.__all__ ...@@ -41,5 +41,5 @@ __all__ += control_flow.__all__
__all__ += ops.__all__ __all__ += ops.__all__
__all__ += device.__all__ __all__ += device.__all__
__all__ += detection.__all__ __all__ += detection.__all__
__all__ += metric.__all__ __all__ += metric_op.__all__
__all__ += learning_rate_scheduler.__all__ __all__ += learning_rate_scheduler.__all__
...@@ -186,7 +186,6 @@ class ListenAndServ(object): ...@@ -186,7 +186,6 @@ class ListenAndServ(object):
main_program = self.helper.main_program main_program = self.helper.main_program
current_block = main_program.current_block() current_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
empty_block = Program().global_block()
parent_block.append_op( parent_block.append_op(
type='listen_and_serv', type='listen_and_serv',
...@@ -195,8 +194,9 @@ class ListenAndServ(object): ...@@ -195,8 +194,9 @@ class ListenAndServ(object):
attrs={ attrs={
'endpoint': self.endpoint, 'endpoint': self.endpoint,
'Fanin': self.fan_in, 'Fanin': self.fan_in,
'OptimizeBlock': current_block, 'optimize_blocks': [
'PrefetchBlock': empty_block, current_block
], # did not support multiple optimize blocks in layers
'sync_mode': True, # did not support async now in layers 'sync_mode': True, # did not support async now in layers
'grad_to_block_id': [""] 'grad_to_block_id': [""]
}) })
......
...@@ -126,7 +126,7 @@ def auc(input, label, curve='ROC', num_thresholds=200): ...@@ -126,7 +126,7 @@ def auc(input, label, curve='ROC', num_thresholds=200):
topk_out, topk_indices = nn.topk(input, k=k) topk_out, topk_indices = nn.topk(input, k=k)
auc_out = helper.create_tmp_variable(dtype="float32") auc_out = helper.create_tmp_variable(dtype="float32")
helper.append_op( helper.append_op(
type="accuracy", type="auc",
inputs={ inputs={
"Out": [topk_out], "Out": [topk_out],
"Indices": [topk_indices], "Indices": [topk_indices],
......
...@@ -4920,16 +4920,16 @@ def random_crop(x, shape, seed=None): ...@@ -4920,16 +4920,16 @@ def random_crop(x, shape, seed=None):
return out return out
def log(x): def log(input):
""" """
Calculates the natural log of the given input tensor, element-wise. Calculates the natural log of the given input tensor, element-wise.
.. math:: .. math::
Out = \\ln(x) Out = \\ln(input)
Args: Args:
x (Variable): Input tensor. input (Variable): Input tensor.
Returns: Returns:
Variable: The natural log of the input tensor computed element-wise. Variable: The natural log of the input tensor computed element-wise.
...@@ -4938,27 +4938,27 @@ def log(x): ...@@ -4938,27 +4938,27 @@ def log(x):
.. code-block:: python .. code-block:: python
output = fluid.layers.log(x) output = fluid.layers.log(input)
""" """
helper = LayerHelper('log', **locals()) helper = LayerHelper('log', **locals())
dtype = helper.input_dtype() dtype = helper.input_dtype(input_param_name='x')
out = helper.create_tmp_variable(dtype) out = helper.create_tmp_variable(dtype)
helper.append_op(type="log", inputs={"X": input}, outputs={"Out": out}) helper.append_op(type="log", inputs={"X": x}, outputs={"Out": out})
return out return out
def relu(x): def relu(input):
""" """
Relu takes one input data (Tensor) and produces one output data (Tensor) Relu takes one input data (Tensor) and produces one output data (Tensor)
where the rectified linear function, y = max(0, x), is applied to where the rectified linear function, y = max(0, input), is applied to
the tensor elementwise. the tensor elementwise.
.. math:: .. math::
Out = \\max(0, x) Out = \\max(0, input)
Args: Args:
x (Variable): The input tensor. input (Variable): The input tensor.
Returns: Returns:
Variable: The output tensor with the same shape as input. Variable: The output tensor with the same shape as input.
...@@ -4967,12 +4967,12 @@ def relu(x): ...@@ -4967,12 +4967,12 @@ def relu(x):
.. code-block:: python .. code-block:: python
output = fluid.layers.relu(x) output = fluid.layers.relu(input)
""" """
helper = LayerHelper('relu', **locals()) helper = LayerHelper('relu', **locals())
dtype = helper.input_dtype() dtype = helper.input_dtype(input_param_name='x')
out = helper.create_tmp_variable(dtype) out = helper.create_tmp_variable(dtype)
helper.append_op(type="relu", inputs={"X": input}, outputs={"Out": out}) helper.append_op(type="relu", inputs={"X": x}, outputs={"Out": out})
return out return out
......
...@@ -230,11 +230,11 @@ def sums(input, out=None): ...@@ -230,11 +230,11 @@ def sums(input, out=None):
helper = LayerHelper('sum', **locals()) helper = LayerHelper('sum', **locals())
if out is None: if out is None:
out = helper.create_tmp_variable(dtype=helper.input_dtype()) out = helper.create_tmp_variable(dtype=helper.input_dtype())
helper.append_op( helper.append_op(
type='sum', type='sum',
inputs={'X': input}, inputs={'X': input},
outputs={'Out': out}, outputs={'Out': out},
attrs={'use_mkldnn': False}) attrs={'use_mkldnn': False})
return out return out
......
...@@ -23,6 +23,8 @@ import warnings ...@@ -23,6 +23,8 @@ import warnings
__all__ = [ __all__ = [
'MetricBase', 'MetricBase',
'CompositeMetric', 'CompositeMetric',
'Precision',
'Recall',
'Accuracy', 'Accuracy',
'ChunkEvaluator', 'ChunkEvaluator',
'EditDistance', 'EditDistance',
...@@ -46,33 +48,34 @@ def _is_number_or_matrix_(var): ...@@ -46,33 +48,34 @@ def _is_number_or_matrix_(var):
class MetricBase(object): class MetricBase(object):
""" """
Base Class for all evaluators Base Class for all Metrics.
MetricBase define a group of interfaces for the
model evaluation methods. Metrics accumulate metric states between
consecutive minibatches, at every minibatch, use update
interface to add current minibatch value to global states.
Use eval to compute accumative metric value from last reset()
or from scratch on.
If you need to custom a new metric, please inherit from MetricBase and
custom implementation.
Args: Args:
name(str): The name of evaluator. such as, "accuracy". Used for generate name(str): The name of metric instance. such as, "accuracy".
temporary variable name. It needed if you want to distinct different metrics in a model.
Interface:
Note(*) : the states is the attributes who not has _ prefix.
get_config(): print current states and configuration
reset(): clear the states. If the Metrics states type is not (int, float, np.ndarray),
Please override this method.
update(): update states at every minibatch
eval(): get metric evaluation in numpy type.
""" """
def __init__(self, name, **kwargs): def __init__(self, name):
self._name = str(name) if name != None else self.__class__.__name__ self._name = str(name) if name != None else self.__class__.__name__
self._kwargs = kwargs if kwargs != None else dict()
self.reset()
def __str__(self): def __str__(self):
return self._name return self._name
def reset(self): def reset(self):
""" """
states is the attributes who not has _ prefix. reset clear the states of metrics. By default, the states
reset the states of metrics. are the members who do not has _ prefix, reset set them to inital states.
If you violate the implicit name rule, please also custom the reset
interface.
""" """
states = { states = {
attr: value attr: value
...@@ -90,61 +93,231 @@ class MetricBase(object): ...@@ -90,61 +93,231 @@ class MetricBase(object):
setattr(self, attr, None) setattr(self, attr, None)
def get_config(self): def get_config(self):
"""
Get the metric and current states.
The states are the members who do not has "_" prefix.
Args:
None
Returns:
dict: a dict of metric and states
"""
states = { states = {
attr: value attr: value
for attr, value in self.__dict__.iteritems() for attr, value in self.__dict__.iteritems()
if not attr.startswith("_") if not attr.startswith("_")
} }
config = copy.deepcopy(self._kwargs) config = {}
config.update({"name": self._name, "states": copy.deepcopy(states)}) config.update({"name": self._name, "states": copy.deepcopy(states)})
return config return config
def update(self): def update(self, preds, labels):
raise NotImplementedError() """
Updates the metric states at every minibatch.
One user can compute the minibatch metric via pure Python, or
via a c++ operator.
Args:
preds(numpy.array): the predictions of current minibatch
labels(numpy.array): the labels of current minibatch, if the label is one-hot
or soft-label, should custom the corresponding update rule.
"""
raise NotImplementedError(
"Should not use it directly, please extend it.")
def eval(self): def eval(self):
raise NotImplementedError() """
Evalute the current metrics based the accumulated states.
Returns:
float|list(float)|numpy.array: the metrics via Python.
"""
raise NotImplementedError(
"Should not use it directly, please extend it.")
class CompositeMetric(MetricBase): class CompositeMetric(MetricBase):
""" """
Compute multiple metrics in each minibatch. Composite multiple metrics in one instance.
for example, merge F1, accuracy, recall into one Metric. for example, merge F1, accuracy, recall into one Metric.
Examples:
.. code-block:: python
labels = fluid.layers.data(name="data", shape=[1], dtype="int32")
data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32")
pred = fluid.layers.fc(input=data, size=1000, act="tanh")
comp = fluid.metrics.CompositeMetric()
acc = fluid.metrics.Precision()
recall = fluid.metrics.Recall()
comp.add_metric(acc)
comp.add_metric(recall)
for pass in range(PASSES):
comp.reset()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, preds, labels])
comp.update(preds=preds, labels=labels)
numpy_acc, numpy_recall = comp.eval()
""" """
def __init__(self, name=None, **kwargs): def __init__(self, name=None):
super(CompositeMetric, self).__init__(name, kwargs) super(CompositeMetric, self).__init__(name)
self._metrics = [] self._metrics = []
def add_metric(self, metric): def add_metric(self, metric):
"""
add one metric instance to CompositeMetric.
Args:
metric: a instance of MetricBase.
"""
if not isinstance(metric, MetricBase): if not isinstance(metric, MetricBase):
raise ValueError("SubMetric should be inherit from MetricBase.") raise ValueError("SubMetric should be inherit from MetricBase.")
self._metrics.append(metric) self._metrics.append(metric)
def update(self, preds, labels):
"""
Update every metrics in sequence.
Args:
preds(numpy.array): the predictions of current minibatch
labels(numpy.array): the labels of current minibatch, if the label is one-hot
or soft-label, should custom the corresponding update rule.
"""
for m in self._metrics:
ans.append(m.update(preds, labels))
def eval(self): def eval(self):
"""
Evaluate every metrics in sequence.
Returns:
list(float|numpy.array): a list of metrics value in Python.
"""
ans = [] ans = []
for m in self._metrics: for m in self._metrics:
ans.append(m.eval()) ans.append(m.eval())
return ans return ans
class Precision(MetricBase):
"""
Precision (also called positive predictive value) is the fraction of
relevant instances among the retrieved instances.
https://en.wikipedia.org/wiki/Evaluation_of_binary_classifiers
Note Precision is different with Accuracy in binary classifiers.
accuracy = true positive / total instances
precision = true positive / all positive instance
Examples:
.. code-block:: python
metric = fluid.metrics.Precision()
for pass in range(PASSES):
metric.reset()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, preds, labels])
metric.update(preds=preds, labels=labels)
numpy_precision = metric.eval()
"""
def __init__(self, name=None):
super(Precision, self).__init__(name)
self.tp = 0 # true positive
self.fp = 0 # false positive
def update(self, preds, labels):
if not _is_numpy_(preds):
raise ValueError("The 'preds' must be a numpy ndarray.")
if not _is_numpy_(labels):
raise ValueError("The 'labels' must be a numpy ndarray.")
sample_num = labels[0]
for i in range(sample_num):
pred = preds[i].astype("int32")
label = labels[i]
if label == 1:
if pred == label:
self.tp += 1
else:
self.fp += 1
def eval(self):
ap = self.tp + self.fp
return float(self.tp) / ap if ap != 0 else .0
class Recall(MetricBase):
"""
Recall (also known as sensitivity) is the fraction of
relevant instances that have been retrieved over the
total amount of relevant instances
https://en.wikipedia.org/wiki/Precision_and_recall
Examples:
.. code-block:: python
metric = fluid.metrics.Recall()
for pass in range(PASSES):
metric.reset()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, preds, labels])
metric.update(preds=preds, labels=labels)
numpy_recall = metric.eval()
"""
def __init__(self, name=None):
super(Recall, self).__init__(name)
self.tp = 0 # true positive
self.fn = 0 # false negtive
def update(self, preds, labels):
if not _is_numpy_(preds):
raise ValueError("The 'preds' must be a numpy ndarray.")
if not _is_numpy_(labels):
raise ValueError("The 'labels' must be a numpy ndarray.")
sample_num = labels[0]
for i in range(sample_num):
pred = preds[i].astype("int32")
label = labels[i]
if label == 1:
if pred == label:
self.tp += 1
else:
if pred != label:
self.fn += 1
def eval(self):
recall = self.tp + self.fn
return float(self.tp) / recall if recall != 0 else .0
class Accuracy(MetricBase): class Accuracy(MetricBase):
""" """
Accumulate the accuracy from minibatches and compute the average accuracy Accumulate the accuracy from minibatches and compute the average accuracy
for every pass. for every pass.
https://en.wikipedia.org/wiki/Accuracy_and_precision
Args: Args:
name: the metrics name name: the metrics name
Example: Examples:
minibatch_accuracy = fluid.layers.accuracy(pred, label) .. code-block:: python
accuracy_evaluator = fluid.metrics.Accuracy()
for epoch in PASS_NUM: labels = fluid.layers.data(name="data", shape=[1], dtype="int32")
accuracy_evaluator.reset() data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32")
for data in batches: pred = fluid.layers.fc(input=data, size=1000, act="tanh")
loss = exe.run(fetch_list=[cost, minibatch_accuracy]) minibatch_accuracy = fluid.layers.accuracy(pred, label)
accuracy_evaluator.update(value=minibatch_accuracy, weight=batches) accuracy_evaluator = fluid.metrics.Accuracy()
accuracy = accuracy_evaluator.eval() for pass in range(PASSES):
accuracy_evaluator.reset()
for data in train_reader():
batch_size = data[0]
loss = exe.run(fetch_list=[cost, minibatch_accuracy])
accuracy_evaluator.update(value=minibatch_accuracy, weight=batch_size)
numpy_acc = accuracy_evaluator.eval()
""" """
def __init__(self, name=None): def __init__(self, name=None):
...@@ -153,6 +326,13 @@ class Accuracy(MetricBase): ...@@ -153,6 +326,13 @@ class Accuracy(MetricBase):
self.weight = .0 self.weight = .0
def update(self, value, weight): def update(self, value, weight):
"""
Update minibatch states.
Args:
value(float|numpy.array): accuracy of one minibatch.
weight(int|float): batch size.
"""
if not _is_number_or_matrix_(value): if not _is_number_or_matrix_(value):
raise ValueError( raise ValueError(
"The 'value' must be a number(int, float) or a numpy ndarray.") "The 'value' must be a number(int, float) or a numpy ndarray.")
...@@ -163,9 +343,8 @@ class Accuracy(MetricBase): ...@@ -163,9 +343,8 @@ class Accuracy(MetricBase):
def eval(self): def eval(self):
if self.weight == 0: if self.weight == 0:
raise ValueError( raise ValueError("There is no data in Accuracy Metrics. \
"There is no data in Accuracy Metrics. Please check layers.accuracy output has added to Accuracy." Please check layers.accuracy output has added to Accuracy.")
)
return self.value / self.weight return self.value / self.weight
...@@ -174,6 +353,25 @@ class ChunkEvaluator(MetricBase): ...@@ -174,6 +353,25 @@ class ChunkEvaluator(MetricBase):
Accumulate counter numbers output by chunk_eval from mini-batches and Accumulate counter numbers output by chunk_eval from mini-batches and
compute the precision recall and F1-score using the accumulated counter compute the precision recall and F1-score using the accumulated counter
numbers. numbers.
For some basics of chunking, please refer to
'Chunking with Support Vector Machines <https://aclanthology.info/pdf/N/N01/N01-1025.pdf>'.
ChunkEvalEvaluator computes the precision, recall, and F1-score of chunk detection,
and supports IOB, IOE, IOBES and IO (also known as plain) tagging schemes.
Examples:
.. code-block:: python
labels = fluid.layers.data(name="data", shape=[1], dtype="int32")
data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32")
pred = fluid.layers.fc(input=data, size=1000, act="tanh")
precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval(
input=pred,
label=label)
metric = fluid.metrics.ChunkEvaluator()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, preds, labels])
metric.update(num_infer_chunks, num_label_chunks, num_correct_chunks)
numpy_precision, numpy_recall, numpy_f1 = metric.eval()
""" """
def __init__(self, name=None): def __init__(self, name=None):
...@@ -183,9 +381,17 @@ class ChunkEvaluator(MetricBase): ...@@ -183,9 +381,17 @@ class ChunkEvaluator(MetricBase):
self.num_correct_chunks = 0 self.num_correct_chunks = 0
def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks): def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks):
"""
Update the states based on the layers.chunk_eval() ouputs.
Args:
num_infer_chunks(int|numpy.array): The number of chunks in Inference on the given minibatch.
num_label_chunks(int|numpy.array): The number of chunks in Label on the given mini-batch.
num_correct_chunks(int|float|numpy.array): The number of chunks both in Inference and Label on the
given mini-batch.
"""
if not _is_number_or_matrix_(num_infer_chunks): if not _is_number_or_matrix_(num_infer_chunks):
raise ValueError( raise ValueError(
"The 'num_infer_chunks' must be a number(int, float) or a numpy ndarray." "The 'num_infer_chunks' must be a number(int) or a numpy ndarray."
) )
if not _is_number_or_matrix_(num_label_chunks): if not _is_number_or_matrix_(num_label_chunks):
raise ValueError( raise ValueError(
...@@ -212,21 +418,28 @@ class ChunkEvaluator(MetricBase): ...@@ -212,21 +418,28 @@ class ChunkEvaluator(MetricBase):
class EditDistance(MetricBase): class EditDistance(MetricBase):
""" """
Edit distance is a way of quantifying how dissimilar two strings
(e.g., words) are to one another by counting the minimum number
of operations required to transform one string into the other.
Refer to https://en.wikipedia.org/wiki/Edit_distance
Accumulate edit distance sum and sequence number from mini-batches and Accumulate edit distance sum and sequence number from mini-batches and
compute the average edit_distance and instance error of all batches. compute the average edit_distance and instance error of all batches.
Args: Args:
name: the metrics name name: the metrics name
Example: Examples:
edit_distance_metrics = fluid.layers.edit_distance(input, label) .. code-block:: python
distance_evaluator = fluid.metrics.EditDistance()
for epoch in PASS_NUM: distances, seq_num = fluid.layers.edit_distance(input, label)
distance_evaluator.reset() distance_evaluator = fluid.metrics.EditDistance()
for data in batches: for epoch in PASS_NUM:
loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics)) distance_evaluator.reset()
distance_evaluator.update(*edit_distance_metrics) for data in batches:
distance, instance_error = distance_evaluator.eval() loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics))
distance_evaluator.update(distances, seq_num)
distance, instance_error = distance_evaluator.eval()
In the above example: In the above example:
'distance' is the average of the edit distance in a pass. 'distance' is the average of the edit distance in a pass.
...@@ -264,16 +477,38 @@ class EditDistance(MetricBase): ...@@ -264,16 +477,38 @@ class EditDistance(MetricBase):
class DetectionMAP(MetricBase): class DetectionMAP(MetricBase):
""" """
Calculate the detection mean average precision (mAP). Calculate the detection mean average precision (mAP).
mAP is the metric to measure the accuracy of object detectors
TODO (Dang Qingqing): update the following doc. like Faster R-CNN, SSD, etc.
The general steps are as follows: It is the average of the maximum precisions at different recall values.
1. calculate the true positive and false positive according to the input
of detection and labels.
2. calculate mAP value, support two versions: '11 point' and 'integral'.
Please get more information from the following articles: Please get more information from the following articles:
https://sanchom.wordpress.com/tag/average-precision/ https://sanchom.wordpress.com/tag/average-precision/
https://arxiv.org/abs/1512.02325 https://arxiv.org/abs/1512.02325
The general steps are as follows:
1. calculate the true positive and false positive according to the input
of detection and labels.
2. calculate mAP value, support two versions: '11 point' and 'integral'.
Examples:
.. code-block:: python
pred = fluid.layers.fc(input=data, size=1000, act="tanh")
batch_map = layers.detection_map(
input,
label,
class_num,
background_label,
overlap_threshold=overlap_threshold,
evaluate_difficult=evaluate_difficult,
ap_version=ap_version)
metric = fluid.metrics.DetectionMAP()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, batch_map])
batch_size = data[0]
metric.update(value=batch_map, weight=batch_size)
numpy_map = metric.eval()
""" """
def __init__(self, name=None): def __init__(self, name=None):
...@@ -302,17 +537,18 @@ class DetectionMAP(MetricBase): ...@@ -302,17 +537,18 @@ class DetectionMAP(MetricBase):
class Auc(MetricBase): class Auc(MetricBase):
""" """
Auc Metrics which adapts to binary classification. Auc metric adapts to the binary classification.
Need to note that auc metrics compute the value via Python natively. Refer to https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve
Need to note that auc metric compute the value via Python natively.
If you concern the speed, please use the fluid.layers.auc instead. If you concern the speed, please use the fluid.layers.auc instead.
The `auc` function creates four local variables, `true_positives`, The `auc` function creates four local variables, `true_positives`,
`true_negatives`, `false_positives` and `false_negatives` that are used to `true_negatives`, `false_positives` and `false_negatives` that are used to
compute the AUC. To discretize the AUC curve, a linearly spaced set of compute the AUC. To discretize the AUC curve, a linearly spaced set of
thresholds is used to compute pairs of recall and precision values. The area thresholds is used to compute pairs of recall and precision values. The area
under the ROC-curve is therefore computed using the height of the recall under the ROC-curve is therefore computed using the height of the recall
values by the false positive rate, while the area under the PR-curve is the values by the false positive rate, while the area under the PR-curve is the
computed using the height of the precision values by the recall. computed using the height of the precision values by the recall.
Args: Args:
name: metric name name: metric name
...@@ -322,6 +558,16 @@ class Auc(MetricBase): ...@@ -322,6 +558,16 @@ class Auc(MetricBase):
curve. curve.
"NOTE: only implement the ROC curve type via Python now." "NOTE: only implement the ROC curve type via Python now."
Examples:
.. code-block:: python
pred = fluid.layers.fc(input=data, size=1000, act="tanh")
metric = fluid.metrics.Auc()
for data in train_reader():
loss, preds, labels = exe.run(fetch_list=[cost, preds, labels])
metric.update(preds, labels)
numpy_auc = metric.eval()
""" """
def __init__(self, name, curve='ROC', num_thresholds=200): def __init__(self, name, curve='ROC', num_thresholds=200):
...@@ -334,10 +580,10 @@ class Auc(MetricBase): ...@@ -334,10 +580,10 @@ class Auc(MetricBase):
self.tn_list = np.zeros((num_thresholds, )) self.tn_list = np.zeros((num_thresholds, ))
self.fp_list = np.zeros((num_thresholds, )) self.fp_list = np.zeros((num_thresholds, ))
def update(self, labels, predictions, axis=1): def update(self, preds, labels):
if not _is_numpy_(labels): if not _is_numpy_(labels):
raise ValueError("The 'labels' must be a numpy ndarray.") raise ValueError("The 'labels' must be a numpy ndarray.")
if not _is_numpy_(predictions): if not _is_numpy_(preds):
raise ValueError("The 'predictions' must be a numpy ndarray.") raise ValueError("The 'predictions' must be a numpy ndarray.")
kepsilon = 1e-7 # to account for floating point imprecisions kepsilon = 1e-7 # to account for floating point imprecisions
......
...@@ -15,7 +15,7 @@ if(NOT WITH_DISTRIBUTE) ...@@ -15,7 +15,7 @@ if(NOT WITH_DISTRIBUTE)
endif(NOT WITH_DISTRIBUTE) endif(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290 list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
list(REMOVE_ITEM TEST_OPS test_modified_huber_loss_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5184 list(REMOVE_ITEM TEST_OPS test_modified_huber_loss_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5184
list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185 list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185
list(REMOVE_ITEM TEST_OPS test_nce) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/7778 list(REMOVE_ITEM TEST_OPS test_nce) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/7778
list(REMOVE_ITEM TEST_OPS test_recurrent_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/6152 list(REMOVE_ITEM TEST_OPS test_recurrent_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/6152
...@@ -43,8 +43,6 @@ list(REMOVE_ITEM TEST_OPS test_warpctc_op) ...@@ -43,8 +43,6 @@ list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dist_train) list(REMOVE_ITEM TEST_OPS test_dist_train)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
# TODO(wuyi): this test hungs on CI, will add it back later
list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op)
foreach(TEST_OP ${TEST_OPS}) foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP) endforeach(TEST_OP)
...@@ -52,3 +50,4 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$ ...@@ -52,3 +50,4 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$
py_test_modules(test_dist_train MODULES test_dist_train SERIAL) py_test_modules(test_dist_train MODULES test_dist_train SERIAL)
py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL) py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL)
py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL) py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL)
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20)
...@@ -94,7 +94,7 @@ class TestListenAndServOp(OpTest): ...@@ -94,7 +94,7 @@ class TestListenAndServOp(OpTest):
self._wait_ps_ready(p1.pid) self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p1.pid, signal.SIGKILL) os.kill(p1.pid, signal.SIGINT)
p1.join() p1.join()
# run pserver on CPU in async mode # run pserver on CPU in async mode
...@@ -102,7 +102,7 @@ class TestListenAndServOp(OpTest): ...@@ -102,7 +102,7 @@ class TestListenAndServOp(OpTest):
self._wait_ps_ready(p2.pid) self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p2.pid, signal.SIGKILL) os.kill(p2.pid, signal.SIGTERM)
p2.join() p2.join()
......
...@@ -396,7 +396,7 @@ class DistributeTranspiler(object): ...@@ -396,7 +396,7 @@ class DistributeTranspiler(object):
return varname return varname
return "" return ""
def __clone_lr_op_sub_block__(op, program, new_block): def __clone_lr_op_sub_block__(op, program, lr_block):
if not op.has_attr('sub_block'): if not op.has_attr('sub_block'):
return return
...@@ -405,36 +405,41 @@ class DistributeTranspiler(object): ...@@ -405,36 +405,41 @@ class DistributeTranspiler(object):
assert isinstance(origin_block, Block) assert isinstance(origin_block, Block)
# we put the new sub block to new block to follow the block # we put the new sub block to new block to follow the block
# hierarchy of the original blocks # hierarchy of the original blocks
new_sub_block = program.create_block(new_block.idx) new_sub_block = program.create_block(lr_block.idx)
# clone vars # clone vars
for var in origin_block.vars: for var in origin_block.vars:
new_sub_block.clone_variable(var) new_sub_block.clone_variable(var)
# clone ops # clone ops
for op in origin_block.ops: for origin_op in origin_block.ops:
self._clone_lr_op(program, new_sub_block, op) cloned_op = self._clone_lr_op(program, new_sub_block, origin_op)
# clone sub_block of op # clone sub_block of op
__clone_lr_op_sub_block__(op, program, new_sub_block) __clone_lr_op_sub_block__(cloned_op, program, new_sub_block)
# reset the block of op # reset the block of op
op.set_attr('sub_block', new_sub_block) op.set_attr('sub_block', new_sub_block)
# append lr decay ops to the child block if exists # append lr decay ops to the child block if exists
lr_ops = self._get_lr_ops() lr_ops = self._get_lr_ops()
# record optimize blocks and we can run them on pserver parallel
optimize_blocks = []
if len(lr_ops) > 0: if len(lr_ops) > 0:
lr_decay_block = pserver_program.create_block( lr_decay_block = pserver_program.create_block(
pserver_program.num_blocks - 1) pserver_program.num_blocks - 1)
optimize_blocks.append(lr_decay_block)
for _, op in enumerate(lr_ops): for _, op in enumerate(lr_ops):
self._append_pserver_non_opt_ops(lr_decay_block, op) cloned_op = self._append_pserver_non_opt_ops(lr_decay_block, op)
# append sub blocks to pserver_program in lr_decay_op # append sub blocks to pserver_program in lr_decay_op
__clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) __clone_lr_op_sub_block__(cloned_op, pserver_program,
lr_decay_block)
# append op to the current block # append op to the current block
grad_to_block_id = [] grad_to_block_id = []
pre_block_idx = pserver_program.num_blocks - 1 pre_block_idx = pserver_program.num_blocks - 1
for idx, opt_op in enumerate(opt_op_on_pserver): for idx, opt_op in enumerate(opt_op_on_pserver):
per_opt_block = pserver_program.create_block(pre_block_idx) per_opt_block = pserver_program.create_block(pre_block_idx)
optimize_blocks.append(per_opt_block)
# append grad merging ops before clip and weight decay # append grad merging ops before clip and weight decay
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# find the origin @GRAD var before clipping # find the origin @GRAD var before clipping
...@@ -453,6 +458,7 @@ class DistributeTranspiler(object): ...@@ -453,6 +458,7 @@ class DistributeTranspiler(object):
if global_ops: if global_ops:
opt_state_block = pserver_program.create_block( opt_state_block = pserver_program.create_block(
pserver_program.num_blocks - 1) pserver_program.num_blocks - 1)
optimize_blocks.append(opt_state_block)
for glb_op in global_ops: for glb_op in global_ops:
__append_optimize_op__(glb_op, opt_state_block, __append_optimize_op__(glb_op, opt_state_block,
grad_to_block_id, None) grad_to_block_id, None)
...@@ -476,11 +482,11 @@ class DistributeTranspiler(object): ...@@ -476,11 +482,11 @@ class DistributeTranspiler(object):
assert len(prefetch_var_name_to_block_id) == 0 assert len(prefetch_var_name_to_block_id) == 0
attrs = { attrs = {
"OptimizeBlock": pserver_program.block(1), "optimize_blocks": optimize_blocks,
"endpoint": endpoint, "endpoint": endpoint,
"Fanin": self.trainer_num, "Fanin": self.trainer_num,
"sync_mode": self.sync_mode, "sync_mode": self.sync_mode,
"grad_to_block_id": grad_to_block_id "grad_to_block_id": grad_to_block_id,
} }
if len(prefetch_var_name_to_block_id) > 0: if len(prefetch_var_name_to_block_id) > 0:
attrs['prefetch_var_name_to_block_id'] \ attrs['prefetch_var_name_to_block_id'] \
...@@ -1237,7 +1243,7 @@ class DistributeTranspiler(object): ...@@ -1237,7 +1243,7 @@ class DistributeTranspiler(object):
if var not in program.global_block().vars: if var not in program.global_block().vars:
block.clone_variable(var) block.clone_variable(var)
block.append_op( return block.append_op(
type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs) type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs)
def _append_pserver_non_opt_ops(self, optimize_block, opt_op): def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
...@@ -1275,7 +1281,7 @@ class DistributeTranspiler(object): ...@@ -1275,7 +1281,7 @@ class DistributeTranspiler(object):
elif not program.global_block().vars.has_key(var.name): elif not program.global_block().vars.has_key(var.name):
program.global_block().clone_variable(var) program.global_block().clone_variable(var)
optimize_block.append_op( return optimize_block.append_op(
type=opt_op.type, type=opt_op.type,
inputs=inputs, inputs=inputs,
outputs=outputs, outputs=outputs,
......
...@@ -43,7 +43,7 @@ CIFAR100_URL = URL_PREFIX + 'cifar-100-python.tar.gz' ...@@ -43,7 +43,7 @@ CIFAR100_URL = URL_PREFIX + 'cifar-100-python.tar.gz'
CIFAR100_MD5 = 'eb9058c3a382ffc7106e4002c42a8d85' CIFAR100_MD5 = 'eb9058c3a382ffc7106e4002c42a8d85'
def reader_creator(filename, sub_name): def reader_creator(filename, sub_name, cycle=False):
def read_batch(batch): def read_batch(batch):
data = batch['data'] data = batch['data']
labels = batch.get('labels', batch.get('fine_labels', None)) labels = batch.get('labels', batch.get('fine_labels', None))
...@@ -56,10 +56,13 @@ def reader_creator(filename, sub_name): ...@@ -56,10 +56,13 @@ def reader_creator(filename, sub_name):
names = (each_item.name for each_item in f names = (each_item.name for each_item in f
if sub_name in each_item.name) if sub_name in each_item.name)
for name in names: while True:
batch = cPickle.load(f.extractfile(name)) for name in names:
for item in read_batch(batch): batch = cPickle.load(f.extractfile(name))
yield item for item in read_batch(batch):
yield item
if not cycle:
break
return reader return reader
...@@ -94,34 +97,40 @@ def test100(): ...@@ -94,34 +97,40 @@ def test100():
'test') 'test')
def train10(): def train10(cycle=False):
""" """
CIFAR-10 training set creator. CIFAR-10 training set creator.
It returns a reader creator, each sample in the reader is image pixels in It returns a reader creator, each sample in the reader is image pixels in
[0, 1] and label in [0, 9]. [0, 1] and label in [0, 9].
:param cycle: whether to cycle through the dataset
:type cycle: bool
:return: Training reader creator :return: Training reader creator
:rtype: callable :rtype: callable
""" """
return reader_creator( return reader_creator(
paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5), paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5),
'data_batch') 'data_batch',
cycle=cycle)
def test10(): def test10(cycle=False):
""" """
CIFAR-10 test set creator. CIFAR-10 test set creator.
It returns a reader creator, each sample in the reader is image pixels in It returns a reader creator, each sample in the reader is image pixels in
[0, 1] and label in [0, 9]. [0, 1] and label in [0, 9].
:param cycle: whether to cycle through the dataset
:type cycle: bool
:return: Test reader creator. :return: Test reader creator.
:rtype: callable :rtype: callable
""" """
return reader_creator( return reader_creator(
paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5), paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5),
'test_batch') 'test_batch',
cycle=cycle)
def fetch(): def fetch():
......
...@@ -76,7 +76,8 @@ def reader_creator(data_file, ...@@ -76,7 +76,8 @@ def reader_creator(data_file,
dataset_name, dataset_name,
mapper, mapper,
buffered_size=1024, buffered_size=1024,
use_xmap=True): use_xmap=True,
cycle=False):
''' '''
1. read images from tar file and 1. read images from tar file and
merge images into batch files in 102flowers.tgz_batch/ merge images into batch files in 102flowers.tgz_batch/
...@@ -96,6 +97,8 @@ def reader_creator(data_file, ...@@ -96,6 +97,8 @@ def reader_creator(data_file,
:type mapper: callable :type mapper: callable
:param buffered_size: the size of buffer used to process images :param buffered_size: the size of buffer used to process images
:type buffered_size: int :type buffered_size: int
:param cycle: whether to cycle through the dataset
:type cycle: bool
:return: data reader :return: data reader
:rtype: callable :rtype: callable
''' '''
...@@ -108,15 +111,18 @@ def reader_creator(data_file, ...@@ -108,15 +111,18 @@ def reader_creator(data_file,
file_list = batch_images_from_tar(data_file, dataset_name, img2label) file_list = batch_images_from_tar(data_file, dataset_name, img2label)
def reader(): def reader():
for file in open(file_list): while True:
file = file.strip() for file in open(file_list):
batch = None file = file.strip()
with open(file, 'r') as f: batch = None
batch = cPickle.load(f) with open(file, 'r') as f:
data = batch['data'] batch = cPickle.load(f)
labels = batch['label'] data = batch['data']
for sample, label in itertools.izip(data, batch['label']): labels = batch['label']
yield sample, int(label) - 1 for sample, label in itertools.izip(data, batch['label']):
yield sample, int(label) - 1
if not cycle:
break
if use_xmap: if use_xmap:
cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
...@@ -125,7 +131,7 @@ def reader_creator(data_file, ...@@ -125,7 +131,7 @@ def reader_creator(data_file,
return map_readers(mapper, reader) return map_readers(mapper, reader)
def train(mapper=train_mapper, buffered_size=1024, use_xmap=True): def train(mapper=train_mapper, buffered_size=1024, use_xmap=True, cycle=False):
''' '''
Create flowers training set reader. Create flowers training set reader.
It returns a reader, each sample in the reader is It returns a reader, each sample in the reader is
...@@ -138,17 +144,23 @@ def train(mapper=train_mapper, buffered_size=1024, use_xmap=True): ...@@ -138,17 +144,23 @@ def train(mapper=train_mapper, buffered_size=1024, use_xmap=True):
:type mapper: callable :type mapper: callable
:param buffered_size: the size of buffer used to process images :param buffered_size: the size of buffer used to process images
:type buffered_size: int :type buffered_size: int
:param cycle: whether to cycle through the dataset
:type cycle: bool
:return: train data reader :return: train data reader
:rtype: callable :rtype: callable
''' '''
return reader_creator( return reader_creator(
download(DATA_URL, 'flowers', DATA_MD5), download(DATA_URL, 'flowers', DATA_MD5),
download(LABEL_URL, 'flowers', LABEL_MD5), download(LABEL_URL, 'flowers', LABEL_MD5),
download(SETID_URL, 'flowers', SETID_MD5), TRAIN_FLAG, mapper, download(SETID_URL, 'flowers', SETID_MD5),
buffered_size, use_xmap) TRAIN_FLAG,
mapper,
buffered_size,
use_xmap,
cycle=cycle)
def test(mapper=test_mapper, buffered_size=1024, use_xmap=True): def test(mapper=test_mapper, buffered_size=1024, use_xmap=True, cycle=False):
''' '''
Create flowers test set reader. Create flowers test set reader.
It returns a reader, each sample in the reader is It returns a reader, each sample in the reader is
...@@ -161,14 +173,20 @@ def test(mapper=test_mapper, buffered_size=1024, use_xmap=True): ...@@ -161,14 +173,20 @@ def test(mapper=test_mapper, buffered_size=1024, use_xmap=True):
:type mapper: callable :type mapper: callable
:param buffered_size: the size of buffer used to process images :param buffered_size: the size of buffer used to process images
:type buffered_size: int :type buffered_size: int
:param cycle: whether to cycle through the dataset
:type cycle: bool
:return: test data reader :return: test data reader
:rtype: callable :rtype: callable
''' '''
return reader_creator( return reader_creator(
download(DATA_URL, 'flowers', DATA_MD5), download(DATA_URL, 'flowers', DATA_MD5),
download(LABEL_URL, 'flowers', LABEL_MD5), download(LABEL_URL, 'flowers', LABEL_MD5),
download(SETID_URL, 'flowers', SETID_MD5), TEST_FLAG, mapper, download(SETID_URL, 'flowers', SETID_MD5),
buffered_size, use_xmap) TEST_FLAG,
mapper,
buffered_size,
use_xmap,
cycle=cycle)
def valid(mapper=test_mapper, buffered_size=1024, use_xmap=True): def valid(mapper=test_mapper, buffered_size=1024, use_xmap=True):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册