提交 928418a9 编写于 作者: T typhoonzero

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into gen_nccl_id_op

......@@ -155,7 +155,7 @@ into offsets
3 2+3 4+5 1+9 2+10 3+12
```
so we know that the first sentence is from word 0 to word 3, and the second sentence from work 3 to word 5.
so we know that the first sentence is from word 0 to word 3, and the second sentence from word 3 to word 5.
Similarly, the lengths in the top level LoD
......
......@@ -37,20 +37,26 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale)
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes),
nccl_ctxs_(nccl_ctxs) {
nccl_ctxs_(nccl_ctxs),
balance_parameter_opt_between_cards_(
balance_parameter_opt_between_cards) {
#else
MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale)
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes) {
local_scopes_(local_scopes),
balance_parameter_opt_between_cards_(
balance_parameter_opt_between_cards) {
#endif
for (auto &p : params) {
grad_names_.insert(GradVarName(p));
......@@ -124,6 +130,12 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
// Find "send" op first for split is in front of send.
OpDesc *send_op = GetSendOpDesc(program);
size_t cur_device_id = 0;
std::vector<std::unordered_set<std::string>> var_name_on_devices;
std::vector<std::unordered_set<std::string>> bcast_var_name_set;
var_name_on_devices.resize(places_.size());
bcast_var_name_set.resize(places_.size());
bool is_forwarding = true;
for (auto *op : program.Block(0).AllOps()) {
if (op->Type() == "send") {
......@@ -139,12 +151,27 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
is_forwarding = false;
} else {
int op_dev_id = GetOpDeviceID(var_name_on_devices, *op);
if (op_dev_id == -1) { // var on all device
CreateComputationalOps(&result, *op, places_.size());
} else {
CreateComputationalOp(&result, *op, op_dev_id);
for (auto &var_name : op->OutputArgumentNames()) {
var_name_on_devices[op_dev_id].emplace(var_name);
}
}
if (!is_forwarding && places_.size() > 1) {
// Currently, we assume that once gradient is generated, it can be
// broadcast, and each gradient is only broadcast once.
for (auto &og : op->OutputArgumentNames()) {
if (IsParameterGradientOnce(og, &og_has_been_broadcast)) {
if (balance_parameter_opt_between_cards_) {
CreateReduceOp(&result, og, cur_device_id);
var_name_on_devices[cur_device_id].emplace(og);
bcast_var_name_set[cur_device_id].emplace(
og.substr(0, og.size() - strlen(kGradVarSuffix)));
cur_device_id = (cur_device_id + 1) % places_.size();
} else {
if (IsSparseGradient(var_types, og)) {
CreateReduceOp(&result, og, 0);
CreateBroadcastOp(&result, og, 0);
......@@ -156,7 +183,15 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
}
}
}
// Insert BCast Ops
for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) {
auto &to_bcast_set = bcast_var_name_set[dev_id];
for (auto &bcast_name : to_bcast_set) {
CreateBroadcastOp(&result, bcast_name, dev_id);
}
}
/*
Dependency graph has been constructed. However, there are still data
harzaeds need to be handled.
......@@ -265,6 +300,26 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
return is_pg_once;
}
int MultiDevSSAGraphBuilder::GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const {
if (!balance_parameter_opt_between_cards_) {
return -1;
}
int var_dev_id = -1;
for (auto &var_name : op.InputArgumentNames()) {
if (var_dev_id != -1) break;
for (size_t i = 0; i < var_name_on_devices.size(); ++i) {
if (var_name_on_devices[i].count(var_name)) {
var_dev_id = static_cast<int>(i);
break;
}
}
}
return var_dev_id;
}
void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const {
for (size_t i = 0; i < places_.size(); ++i) {
// Insert ScaleCost OpHandle
......
......@@ -36,13 +36,15 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs,
bool use_default_grad_scale);
bool use_default_grad_scale,
bool balance_parameter_opt_between_cards);
#else
MultiDevSSAGraphBuilder(const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
bool use_default_grad_scale);
bool use_default_grad_scale,
bool balance_parameter_opt_between_cards);
#endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
......@@ -60,6 +62,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#ifdef PADDLE_WITH_CUDA
platform::NCCLContextMap *nccl_ctxs_;
#endif
bool balance_parameter_opt_between_cards_;
bool use_default_grad_scale_;
bool IsScaleLossOp(const OpDesc &op) const;
......@@ -84,6 +87,10 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::string &og,
std::unordered_set<std::string> *og_has_been_broadcast) const;
int GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const;
void InsertNCCLAllReduceOp(SSAGraph *result, const std::string &og) const;
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
......
......@@ -58,7 +58,8 @@ ParallelExecutor::ParallelExecutor(
const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay,
bool use_default_grad_scale, size_t num_trainers, size_t trainer_id)
bool use_default_grad_scale, bool balance_parameter_opt_between_cards,
size_t num_trainers, size_t trainer_id)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
......@@ -99,11 +100,12 @@ ParallelExecutor::ParallelExecutor(
#ifdef PADDLE_WITH_CUDA
details::MultiDevSSAGraphBuilder builder(
member_->places_, loss_var_name, params, member_->local_scopes_,
member_->nccl_ctxs_.get(), use_default_grad_scale);
member_->nccl_ctxs_.get(), use_default_grad_scale,
balance_parameter_opt_between_cards);
#else
details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name,
params, member_->local_scopes_,
use_default_grad_scale);
details::MultiDevSSAGraphBuilder builder(
member_->places_, loss_var_name, params, member_->local_scopes_,
use_default_grad_scale, balance_parameter_opt_between_cards);
#endif
auto graph = builder.Build(main_program);
......
......@@ -41,6 +41,7 @@ class ParallelExecutor {
const std::string& loss_var_name, Scope* scope,
const std::vector<Scope*>& local_scopes,
bool allow_op_delay, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards,
size_t num_trainers = 0, size_t trainer_id = 0);
~ParallelExecutor();
......
......@@ -276,6 +276,11 @@ foreach(src ${READER_LIBRARY})
set(OP_LIBRARY ${src} ${OP_LIBRARY})
endforeach()
add_subdirectory(detection)
foreach(src ${DETECTION_LIBRARY})
set(OP_LIBRARY ${src} ${OP_LIBRARY})
endforeach()
set(GLOB_OP_LIB ${OP_LIBRARY} CACHE INTERNAL "Global OP library")
cc_test(gather_test SRCS gather_test.cc DEPS tensor)
......
set(LOCAL_DETECTION_LIBS)
function(detection_library TARGET_NAME)
set(oneValueArgs "")
set(multiValueArgs SRCS DEPS)
set(options "")
set(common_deps op_registry)
set(pybind_flag 0)
cmake_parse_arguments(detection_library "${options}" "${oneValueArgs}"
"${multiValueArgs}" ${ARGN})
op_library(${TARGET_NAME} SRCS ${detection_library_SRCS} DEPS ${common_deps} ${detection_library_DEPS})
set(LOCAL_DETECTION_LIBS
${TARGET_NAME}
${LOCAL_DETECTION_LIBS}
PARENT_SCOPE)
endfunction()
detection_library(bipartite_match_op SRCS bipartite_match_op.cc)
detection_library(box_coder_op SRCS box_coder_op.cc box_coder_op.cu)
detection_library(iou_similarity_op SRCS iou_similarity_op.cc
iou_similarity_op.cu)
detection_library(mine_hard_examples_op SRCS mine_hard_examples_op.cc)
detection_library(multiclass_nms_op SRCS multiclass_nms_op.cc)
detection_library(prior_box_op SRCS prior_box_op.cc prior_box_op.cu)
detection_library(target_assign_op SRCS target_assign_op.cc
target_assign_op.cu)
# Export local libraries to parent
set(DETECTION_LIBRARY ${LOCAL_DETECTION_LIBS} PARENT_SCOPE)
......@@ -9,7 +9,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/box_coder_op.h"
#include "paddle/fluid/operators/detection/box_coder_op.h"
namespace paddle {
namespace operators {
......
......@@ -9,7 +9,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/box_coder_op.h"
#include "paddle/fluid/operators/detection/box_coder_op.h"
#include "paddle/fluid/platform/cuda_primitives.h"
namespace paddle {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/iou_similarity_op.h"
#include "paddle/fluid/operators/detection/iou_similarity_op.h"
namespace paddle {
namespace operators {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/iou_similarity_op.h"
#include "paddle/fluid/operators/detection/iou_similarity_op.h"
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/prior_box_op.h"
#include "paddle/fluid/operators/detection/prior_box_op.h"
namespace paddle {
namespace operators {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/prior_box_op.h"
#include "paddle/fluid/operators/detection/prior_box_op.h"
namespace paddle {
namespace operators {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/target_assign_op.h"
#include "paddle/fluid/operators/detection/target_assign_op.h"
namespace paddle {
namespace operators {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/target_assign_op.h"
#include "paddle/fluid/operators/detection/target_assign_op.h"
namespace paddle {
namespace operators {
......
......@@ -83,8 +83,8 @@ class GenNCCLIdOp : public framework::OperatorBase {
rpc_service_->SetProgram(&empty_program);
rpc_service_->SetExecutor(&executor);
server_thread_.reset(new std::thread(
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service_)));
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service_));
rpc_service_->SetCond(0);
VLOG(3) << "start getting nccl id from trainer 0...";
auto recv = rpc_service_->Get();
......@@ -92,13 +92,12 @@ class GenNCCLIdOp : public framework::OperatorBase {
rpc_service_->ShutDown();
VLOG(3) << "rpc server stopped";
// TODO(wuyi): reinit nccl communicators
server_thread_->join();
server_thread.join();
delete rpc_service_;
}
protected:
mutable detail::AsyncGRPCServer* rpc_service_ = nullptr;
mutable std::shared_ptr<std::thread> server_thread_;
};
class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker {
......
......@@ -96,10 +96,22 @@ struct CUBlas<platform::float16> {
reinterpret_cast<__half *>(C), ldc));
}
template <typename... ARGS>
static void GEMM_BATCH(ARGS... args) {
static void GEMM_BATCH(cublasHandle_t handle, cublasOperation_t transa,
cublasOperation_t transb, int m, int n, int k,
const float16 *alpha, const float16 *A, int lda,
long long int strideA, const float16 *B, // NOLINT
int ldb, long long int strideB, // NOLINT
const float16 *beta, float16 *C, int ldc,
long long int strideC, // NOLINT
int batchCount) {
#if CUDA_VERSION >= 8000
PADDLE_ENFORCE(platform::dynload::cublasHgemmStridedBatched(args...));
PADDLE_ENFORCE(platform::dynload::cublasHgemmStridedBatched(
handle, transa, transb, m, n, k,
reinterpret_cast<const __half *>(alpha),
reinterpret_cast<const __half *>(A), lda, strideA,
reinterpret_cast<const __half *>(B), ldb, strideB,
reinterpret_cast<const __half *>(beta), reinterpret_cast<__half *>(C),
ldc, strideC, batchCount));
#else
PADDLE_THROW("HgemmStridedBatched is not supported on cuda <= 7.5");
#endif
......
......@@ -172,9 +172,9 @@ void Blas<platform::CPUDeviceContext>::BatchedGEMM(
c_array.data(), &ldc, 1 /* group_count */, &batchCount);
#else
for (int k = 0; k < batchCount; ++k) {
const float *Ak = &A[k * strideA];
const float *Bk = &B[k * strideB];
float *Ck = &C[k * M * N];
auto *Ak = &A[k * strideA];
auto *Bk = &B[k * strideB];
auto *Ck = &C[k * M * N];
this->template GEMM<T>(transA, transB, M, N, K, alpha, Ak, Bk, beta, Ck);
}
#endif
......
......@@ -35,7 +35,8 @@ template struct SetConstant<platform::CUDADeviceContext, bool>;
#define DEFINE_GPU_TRANS(RANK) \
template struct Transpose<platform::CUDADeviceContext, float, RANK>; \
template struct Transpose<platform::CUDADeviceContext, double, RANK>;
template struct Transpose<platform::CUDADeviceContext, double, RANK>; \
template struct Transpose<platform::CUDADeviceContext, float16, RANK>;
DEFINE_GPU_TRANS(1);
DEFINE_GPU_TRANS(2);
......
......@@ -25,7 +25,7 @@ namespace operators {
* Get row matrix shape from a vector shape. If the rank of x_dim > 1, the
* original x_dim is returned.
*/
static framework::DDim RowMatrixFromVector(const framework::DDim& x_dim) {
static framework::DDim RowMatrixFromVector(const framework::DDim &x_dim) {
if (x_dim.size() > 1) {
return x_dim;
}
......@@ -36,7 +36,7 @@ static framework::DDim RowMatrixFromVector(const framework::DDim& x_dim) {
* Get column matrix shape from a vector shape. If the ran of y_dim > 1, the
* original y_dim is returned.
*/
static framework::DDim ColumnMatrixFromVector(const framework::DDim& y_dim) {
static framework::DDim ColumnMatrixFromVector(const framework::DDim &y_dim) {
if (y_dim.size() > 1) {
return y_dim;
}
......@@ -46,12 +46,12 @@ static framework::DDim ColumnMatrixFromVector(const framework::DDim& y_dim) {
template <typename DeviceContext, typename T>
class MatMulKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
auto& x =
void Compute(const framework::ExecutionContext &context) const override {
auto &x =
detail::Ref(context.Input<framework::Tensor>("X"), "Cannot find X");
auto& y =
auto &y =
detail::Ref(context.Input<framework::Tensor>("Y"), "Cannot find Y");
auto* out = context.Output<framework::Tensor>("Out");
auto *out = context.Output<framework::Tensor>("Out");
out->mutable_data<T>(context.GetPlace());
auto blas = math::GetBlas<DeviceContext, T>(context);
......@@ -65,7 +65,7 @@ class MatMulKernel : public framework::OpKernel<T> {
// Reshape a rank-3 tensor from P x M x N to (P * M) x N.
// Identity op if the tensor is not of rank 3.
static framework::Tensor FoldInitDims(const framework::Tensor& input) {
static framework::Tensor FoldInitDims(const framework::Tensor &input) {
auto output = input;
auto in_dims = input.dims();
if (in_dims.size() == 3) {
......@@ -78,8 +78,8 @@ static framework::Tensor FoldInitDims(const framework::Tensor& input) {
// (Warning: This requires transposing data and writes into new memory.)
// Identity op if the tensor is not of rank 3.
template <typename DeviceContext, typename T>
static framework::Tensor FoldHeadAndLastDims(const DeviceContext& context,
const framework::Tensor& input) {
static framework::Tensor FoldHeadAndLastDims(const DeviceContext &context,
const framework::Tensor &input) {
auto in_dims = input.dims();
if (in_dims.size() != 3) {
return input;
......@@ -102,7 +102,7 @@ static framework::Tensor FoldHeadAndLastDims(const DeviceContext& context,
* If transposed, `H,W` will be swapped.
*/
static void ReshapeTensorIntoMatrixSequence(
framework::Tensor* x, const math::MatDescriptor& descriptor) {
framework::Tensor *x, const math::MatDescriptor &descriptor) {
int64_t h, w;
h = descriptor.height_;
w = descriptor.width_;
......@@ -130,9 +130,9 @@ static void ReshapeTensorIntoMatrixSequence(
* If any of `X` and `Y` has batch size BatchSize, the out will have the
* BatchSize.
*/
static void ReshapeXYOutIntoMatrixSequence(framework::Tensor* x,
framework::Tensor* y,
framework::Tensor* out, bool trans_x,
static void ReshapeXYOutIntoMatrixSequence(framework::Tensor *x,
framework::Tensor *y,
framework::Tensor *out, bool trans_x,
bool trans_y) {
auto x_dim = RowMatrixFromVector(x->dims());
auto y_dim = ColumnMatrixFromVector(y->dims());
......@@ -177,10 +177,10 @@ static void ReshapeXYOutIntoMatrixSequence(framework::Tensor* x,
template <typename DeviceContext, typename T>
class MatMulGradKernel : public framework::OpKernel<T> {
public:
void MatMul(const framework::ExecutionContext& context,
const framework::Tensor& a, bool trans_a,
const framework::Tensor& b, bool trans_b,
framework::Tensor* out) const {
void MatMul(const framework::ExecutionContext &context,
const framework::Tensor &a, bool trans_a,
const framework::Tensor &b, bool trans_b,
framework::Tensor *out) const {
out->mutable_data<T>(context.GetPlace());
auto blas = math::GetBlas<DeviceContext, T>(context);
auto mat_dim_a = math::CreateMatrixDescriptor(a.dims(), 0, trans_a);
......@@ -188,18 +188,18 @@ class MatMulGradKernel : public framework::OpKernel<T> {
blas.MatMul(a, mat_dim_a, b, mat_dim_b, T(1), out, T(0));
}
void CalcInputGrad(const framework::ExecutionContext& context,
const framework::Tensor& a, bool trans_a,
bool is_fold_init_dims_a, const framework::Tensor& b,
void CalcInputGrad(const framework::ExecutionContext &context,
const framework::Tensor &a, bool trans_a,
bool is_fold_init_dims_a, const framework::Tensor &b,
bool trans_b, bool is_fold_init_dims_b,
framework::Tensor* out) const {
framework::Tensor *out) const {
if (out == nullptr) return;
bool need_combine = (a.dims().size() == 3 || b.dims().size() == 3) &&
out->dims().size() == 2;
if (!need_combine) {
MatMul(context, a, trans_a, b, trans_b, out);
} else {
auto& ctx = context.template device_context<DeviceContext>();
auto &ctx = context.template device_context<DeviceContext>();
MatMul(context, is_fold_init_dims_a
? FoldInitDims(a)
: FoldHeadAndLastDims<DeviceContext, T>(ctx, a),
......@@ -210,13 +210,13 @@ class MatMulGradKernel : public framework::OpKernel<T> {
}
}
void Compute(const framework::ExecutionContext& context) const override {
void Compute(const framework::ExecutionContext &context) const override {
auto x = *context.Input<framework::Tensor>("X");
auto y = *context.Input<framework::Tensor>("Y");
auto dout =
*context.Input<framework::Tensor>(framework::GradVarName("Out"));
auto* dx = context.Output<framework::Tensor>(framework::GradVarName("X"));
auto* dy = context.Output<framework::Tensor>(framework::GradVarName("Y"));
auto *dx = context.Output<framework::Tensor>(framework::GradVarName("X"));
auto *dy = context.Output<framework::Tensor>(framework::GradVarName("Y"));
bool transpose_x = context.Attr<bool>("transpose_X");
bool transpose_y = context.Attr<bool>("transpose_Y");
......@@ -269,7 +269,7 @@ class MatMulOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel;
protected:
void InferShape(framework::InferShapeContext* context) const override {
void InferShape(framework::InferShapeContext *context) const override {
PADDLE_ENFORCE(context->HasInput("X"),
"Input(X) of MatMulOp should not be null.");
PADDLE_ENFORCE(context->HasInput("Y"),
......@@ -375,7 +375,7 @@ class MatMulOpGrad : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel;
protected:
void InferShape(framework::InferShapeContext* context) const override {
void InferShape(framework::InferShapeContext *context) const override {
PADDLE_ENFORCE(context->HasInput("X"), "Input(X) should not be null");
PADDLE_ENFORCE(context->HasInput("Y"), "Input(Y) should not be null");
PADDLE_ENFORCE(context->HasInput(framework::GradVarName("Out")),
......@@ -401,7 +401,7 @@ class MatMulOpGradMaker : public framework::SingleGradOpDescMaker {
protected:
std::unique_ptr<framework::OpDesc> Apply() const override {
auto* retv = new framework::OpDesc();
auto *retv = new framework::OpDesc();
retv->SetType("matmul_grad");
retv->SetInput("X", Input("X"));
retv->SetInput("Y", Input("Y"));
......@@ -420,15 +420,27 @@ REGISTER_OPERATOR(matmul, ops::MatMulOp, ops::MatMulOpMaker,
ops::MatMulOpGradMaker);
REGISTER_OPERATOR(matmul_grad, ops::MatMulOpGrad);
REGISTER_OP_CPU_KERNEL(
matmul, ops::MatMulKernel<paddle::platform::CPUDeviceContext, float>);
matmul, ops::MatMulKernel<paddle::platform::CPUDeviceContext, float>,
ops::MatMulKernel<paddle::platform::CPUDeviceContext, double>,
ops::MatMulKernel<paddle::platform::CPUDeviceContext,
paddle::platform::float16>);
REGISTER_OP_CPU_KERNEL(
matmul_grad,
ops::MatMulGradKernel<paddle::platform::CPUDeviceContext, float>);
ops::MatMulGradKernel<paddle::platform::CPUDeviceContext, float>,
ops::MatMulGradKernel<paddle::platform::CPUDeviceContext, double>,
ops::MatMulGradKernel<paddle::platform::CPUDeviceContext,
paddle::platform::float16>);
#ifdef PADDLE_WITH_CUDA
REGISTER_OP_CUDA_KERNEL(
matmul, ops::MatMulKernel<paddle::platform::CUDADeviceContext, float>);
matmul, ops::MatMulKernel<paddle::platform::CUDADeviceContext, float>,
ops::MatMulKernel<paddle::platform::CUDADeviceContext, double>,
ops::MatMulKernel<paddle::platform::CUDADeviceContext,
paddle::platform::float16>);
REGISTER_OP_CUDA_KERNEL(
matmul_grad,
ops::MatMulGradKernel<paddle::platform::CUDADeviceContext, float>);
ops::MatMulGradKernel<paddle::platform::CUDADeviceContext, float>,
ops::MatMulGradKernel<paddle::platform::CUDADeviceContext, double>,
ops::MatMulGradKernel<paddle::platform::CUDADeviceContext,
paddle::platform::float16>);
#endif
......@@ -503,12 +503,13 @@ All parameter, weight, gradient are variables in Paddle.
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, std::vector<Scope *> &local_scopes,
bool allow_op_delay, bool use_default_grad_scale,
size_t num_trainers, size_t trainer_id) {
bool balance_parameter_opt_between_cards, size_t num_trainers,
size_t trainer_id) {
new (&self) ParallelExecutor(
num_threads, use_event, places, params, bcast_vars,
main_program, loss_var_name, scope, local_scopes,
allow_op_delay, use_default_grad_scale, num_trainers,
trainer_id);
allow_op_delay, use_default_grad_scale,
balance_parameter_opt_between_cards, num_trainers, trainer_id);
})
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
......
......@@ -16,6 +16,7 @@ from __future__ import print_function
import core
import numpy
import six.moves as six
import multiprocessing
from framework import Variable, default_main_program
......@@ -116,3 +117,60 @@ class DataFeeder(object):
for each_name, each_converter in six.zip(self.feed_names, converter):
ret_dict[each_name] = each_converter.done()
return ret_dict
def feed_parallel(self, iterable, num_places=None):
if isinstance(self.place, core.CUDAPlace):
places = [
core.CUDAPlace(i)
for i in six.xrange(self._get_number_of_places_(num_places))
]
else:
places = [
core.CPUPlace()
for _ in six.xrange(self._get_number_of_places_(num_places))
]
if len(iterable) != len(places):
raise ValueError("feed_parallel takes multiple mini-batches. Each "
"mini-batch will be feed on each device. The "
"number of devices and number of mini-batches "
"must be same.")
place = self.place
for p, batch in six.zip(places, iterable):
self.place = p
yield self.feed(batch)
self.place = place
def _get_number_of_places_(self, num_places):
if num_places is not None:
return int(num_places)
elif isinstance(self.place, core.CUDAPlace):
return core.get_cuda_device_count()
else:
return multiprocessing.cpu_count()
def decorate_reader(self,
reader,
multi_devices,
num_places=None,
drop_last=True):
def __reader_creator__():
if not multi_devices:
for item in reader():
yield self.feed(item)
else:
num = self._get_number_of_places_(num_places)
item = []
for batch in reader():
item.append(batch)
if len(item) == num:
yield list(self.feed_parallel(item, num))
item = []
if not drop_last and len(item) != 0:
raise ValueError(
"The data batch which cannot fit for devices will be "
"dropped is not implementation. Other strategies are "
"not implemented")
return __reader_creator__
......@@ -16,31 +16,42 @@ import core
import framework
import executor
import io
from trainer import check_and_get_place
__all__ = ['Inferencer', ]
class Inferencer(object):
def __init__(self, network_func, param_path=None, place=None):
# 1. we need to generate a framework.Program by calling
# network_func. Reference: fluid.program_guard in test_word2vec.py
# 2. move the default_main_program to self.program.
# 3. run the default_startup program.
# 4. load params from param_path into scope
def __init__(self, param_path, place=None):
"""
:param param_path: the path where the inference model is saved by fluid.io.save_inference_model
:param place: place to do the inference
"""
self.param_path = param_path
self.scope = core.Scope()
self.place = place
self.startup_program = framework.Program()
# TODO: generate the startup_program with network_func
exe = executor.Executor(place)
exe.run(self.startup_program, scope=self.scope)
if param_path:
self.exe = executor.Executor(check_and_get_place(place))
with executor.scope_guard(self.scope):
# load params from param_path into scope
io.load_persistables(exe, dirname=param_path)
def infer(self, inputs):
# run self.program
pass
[self.inference_program, _,
self.fetch_targets] = io.load_inference_model(
executor=self.exe, dirname=param_path)
def infer(self, inputs, return_numpy=True):
"""
:param inputs: a map of {"input_name": input_var} that will be feed into the inference program
to get the predict value
:param return_numpy: if return numpy value for row tensor
:return: the predict value of the inference model
"""
if not isinstance(inputs, dict):
raise ValueError(
"inputs should be a map of {'input_name': input_var}")
with executor.scope_guard(self.scope):
results = self.exe.run(self.inference_program,
feed=inputs,
fetch_list=self.fetch_targets,
return_numpy=return_numpy)
return results
......@@ -263,6 +263,9 @@ def get_inference_program(target_vars, main_program=None):
def prepend_feed_ops(inference_program,
feed_target_names,
feed_holder_name='feed'):
if len(feed_target_names) == 0:
return
global_block = inference_program.global_block()
feed_var = global_block.create_var(
name=feed_holder_name,
......@@ -323,6 +326,7 @@ def save_inference_model(dirname,
if isinstance(feeded_var_names, basestring):
feeded_var_names = [feeded_var_names]
else:
if len(feeded_var_names) > 0:
if not (bool(feeded_var_names) and all(
isinstance(name, basestring) for name in feeded_var_names)):
raise ValueError("'feed_var_names' should be a list of str.")
......
......@@ -31,6 +31,7 @@ class ParallelExecutor(object):
allow_op_delay=False,
share_vars_from=None,
use_default_grad_scale=True,
balance_parameter_opt_between_cards=False,
num_trainers=0,
trainer_id=0):
"""
......@@ -53,6 +54,9 @@ class ParallelExecutor(object):
gradients of each device and scaled gradients would be
aggregated. Otherwise, a customized scale value should be fed
to the network.
balance_parameter_opt_between_cards(bool, default True): Whether
updating different gradients on different cards. Currently, it
is not recommended.
num_trainers(int, default 0): If greater than 0, NCCL will be
initialized with multpile rank of nodes, each node should have
same number of GPUs. Distributed training will be enabled then.
......@@ -137,6 +141,7 @@ class ParallelExecutor(object):
local_scopes,
allow_op_delay,
use_default_grad_scale,
balance_parameter_opt_between_cards,
num_trainers,
trainer_id)
self.scope = scope
......
......@@ -5,3 +5,5 @@ string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
add_subdirectory(high-level-api)
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
# default test
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
add_subdirectory(recognize_digits)
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
# default test
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
......@@ -21,7 +21,6 @@ import unittest
import math
import sys
import os
import paddle.v2.dataset as dataset
BATCH_SIZE = 64
......@@ -54,47 +53,65 @@ def train_program():
predict = inference_program()
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(cost)
acc = fluid.layers.accuracy(input=predict, label=label)
return avg_cost, acc
# acc = fluid.layers.accuracy(input=predict, label=label)
# return avg_cost, acc
return avg_cost
def train(use_cuda, save_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
trainer = fluid.Trainer(train_program, place=place, optimizer=optimizer)
trainer = fluid.Trainer(
train_func=train_program,
infer_func=inference_program,
place=place,
optimizer=optimizer)
def event_handler(event):
if isinstance(event, fluid.EndIteration):
avg_cost, acc = event.values
print("avg_cost: %s" % avg_cost)
print("acc : %s" % acc)
if (event.batch_id + 1) % 10 == 0:
test_metrics = trainer.test(reader=dataset.mnist.test())
avg_cost_set = test_metrics[0]
acc_set = test_metrics[1]
# get test acc and loss
acc = numpy.array(acc_set).mean()
avg_cost = numpy.array(avg_cost_set).mean()
if float(acc) > 0.2: # Smaller value to increase CI speed
trainer.save_params(save_dirname)
else:
print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format(
event.batch_id + 1, float(avg_cost), float(acc)))
if math.isnan(float(avg_cost)):
sys.exit("got NaN loss, training failed.")
if isinstance(event, fluid.EndEpochEvent):
# if (event.epoch + 1) % 10 == 0:
# trainer.save_params(save_dirname)
trainer.save_inference_model(save_dirname)
# TODO: Uncomment this part once we are sure that .train is working
# test_reader = paddle.batch(
# paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
# test_metrics = trainer.test(reader=test_reader)
# avg_cost_set = test_metrics[0]
# acc_set = test_metrics[1]
#
# # get test acc and loss
# acc = numpy.array(acc_set).mean()
# avg_cost = numpy.array(avg_cost_set).mean()
#
# print("avg_cost: %s" % avg_cost)
# print("acc : %s" % acc)
#
# if float(acc) > 0.2: # Smaller value to increase CI speed
# trainer.save_params(save_dirname)
# else:
# print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format(
# event.epoch + 1, float(avg_cost), float(acc)))
# if math.isnan(float(avg_cost)):
# sys.exit("got NaN loss, training failed.")
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
trainer.train(
reader=dataset.mnist.train(), num_pass=100, event_handler=event_handler)
num_epochs=1,
event_handler=event_handler,
reader=train_reader,
feed_order=['img', 'label'])
def infer(use_cuda, save_dirname=None):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
inferencer = fluid.Inferencer(
inference_program, param_path=save_dirname, place=place)
inferencer = fluid.Inferencer(param_path=save_dirname, place=place)
batch_size = 1
tensor_img = numpy.random.uniform(-1.0, 1.0,
......@@ -114,5 +131,5 @@ def main(use_cuda):
if __name__ == '__main__':
for use_cuda in (False, True):
main(use_cuda=use_cuda)
# for use_cuda in (False, True):
main(use_cuda=False)
......@@ -21,7 +21,6 @@ import unittest
import math
import sys
import os
import paddle.v2.dataset as dataset
BATCH_SIZE = 64
......@@ -41,47 +40,64 @@ def train_program():
predict = inference_program()
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(cost)
acc = fluid.layers.accuracy(input=predict, label=label)
return avg_cost, acc
# acc = fluid.layers.accuracy(input=predict, label=label)
# return avg_cost, acc
return avg_cost
def train(use_cuda, save_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
trainer = fluid.Trainer(train_program, place=place, optimizer=optimizer)
trainer = fluid.Trainer(
train_func=train_program,
infer_func=inference_program,
place=place,
optimizer=optimizer)
def event_handler(event):
if isinstance(event, fluid.EndIteration):
avg_cost, acc = event.values
print("avg_cost: %s" % avg_cost)
print("acc : %s" % acc)
if (event.batch_id + 1) % 10 == 0:
test_metrics = trainer.test(reader=dataset.mnist.test())
avg_cost_set = test_metrics[0]
acc_set = test_metrics[1]
# get test acc and loss
acc = numpy.array(acc_set).mean()
avg_cost = numpy.array(avg_cost_set).mean()
if float(acc) > 0.2: # Smaller value to increase CI speed
trainer.save_params(save_dirname)
else:
print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format(
event.batch_id + 1, float(avg_cost), float(acc)))
if math.isnan(float(avg_cost)):
sys.exit("got NaN loss, training failed.")
if isinstance(event, fluid.EndEpochEvent):
# if (event.epoch + 1) % 10 == 0:
trainer.save_inference_model(save_dirname)
# TODO: Uncomment this part once we are sure that .train is working
# test_reader = paddle.batch(
# paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
# test_metrics = trainer.test(reader=test_reader)
# avg_cost_set = test_metrics[0]
# acc_set = test_metrics[1]
#
# # get test acc and loss
# acc = numpy.array(acc_set).mean()
# avg_cost = numpy.array(avg_cost_set).mean()
#
# print("avg_cost: %s" % avg_cost)
# print("acc : %s" % acc)
#
# if float(acc) > 0.2: # Smaller value to increase CI speed
# trainer.save_params(save_dirname)
# else:
# print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format(
# event.epoch + 1, float(avg_cost), float(acc)))
# if math.isnan(float(avg_cost)):
# sys.exit("got NaN loss, training failed.")
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
trainer.train(
reader=dataset.mnist.train(), num_pass=100, event_handler=event_handler)
num_epochs=1,
event_handler=event_handler,
reader=train_reader,
feed_order=['img', 'label'])
def infer(use_cuda, save_dirname=None):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
inferencer = fluid.Inferencer(
inference_program, param_path=save_dirname, place=place)
inferencer = fluid.Inferencer(param_path=save_dirname, place=place)
batch_size = 1
tensor_img = numpy.random.uniform(-1.0, 1.0,
......@@ -101,5 +117,5 @@ def main(use_cuda):
if __name__ == '__main__':
for use_cuda in (False, True):
main(use_cuda=use_cuda)
# for use_cuda in (False, True):
main(use_cuda=False)
......@@ -99,45 +99,45 @@ def train(use_cuda, is_sparse, save_path):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
def event_handler(event):
# print type(event)
if isinstance(event, fluid.EndEpochEvent):
outs = trainer.test(reader=test_reader)
avg_cost = outs[0]
print("loss= ", avg_cost)
if avg_cost < 5.0:
trainer.save_params(save_path)
trainer.save_inference_model(save_path)
return
if math.isnan(avg_cost):
sys.exit("got NaN loss, training failed.")
trainer = fluid.Trainer(
partial(train_program, is_sparse),
partial(inference_program, is_sparse),
fluid.optimizer.SGD(learning_rate=0.001),
place=place)
trainer.train(
reader=train_reader, num_epochs=100, event_handler=event_handler)
reader=train_reader, num_epochs=1, event_handler=event_handler)
def infer(use_cuda, is_sparse, save_path):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
inferencer = fluid.Inferencer(
partial(inference_program, is_sparse),
param_path=save_path,
place=place)
inferencer = fluid.Inferencer(param_path=save_path, place=place)
lod = [0, 1]
first_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1)
second_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1)
third_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1)
fourth_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1)
result = inferencer.infer({
result = inferencer.infer(
{
'firstw': first_word,
'secondw': second_word,
'thirdw': third_word,
'forthw': fourth_word
})
print(result)
},
return_numpy=False)
print(np.array(result[0]))
def main(use_cuda, is_sparse):
......
......@@ -205,7 +205,8 @@ class TestParallelExecutorBase(unittest.TestCase):
allow_op_delay=False,
feed_dict=None,
seed=None,
use_parallel_executor=True):
use_parallel_executor=True,
balance_parameter_opt_between_cards=False):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
......@@ -234,7 +235,11 @@ class TestParallelExecutorBase(unittest.TestCase):
if use_parallel_executor:
exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
True,
loss_name=loss.name,
allow_op_delay=allow_op_delay,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
else:
exe = fluid.Executor(place=place)
......@@ -280,20 +285,27 @@ class TestMNIST(TestParallelExecutorBase):
fluid.recordio_writer.convert_reader_to_recordio_file(
'./mnist.recordio', reader, feeder)
def check_simple_fc_convergence(self):
def check_simple_fc_convergence(self, balance_parameter_opt_between_cards):
self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
simple_fc_net, feed_dict={"image": img,
"label": label})
simple_fc_net,
feed_dict={"image": img,
"label": label},
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_simple_fc(self):
self.check_simple_fc_convergence()
self.check_simple_fc_convergence(False)
def test_simple_fc_with_new_strategy(self):
self.check_simple_fc_convergence(True)
def check_simple_fc_parallel_accuracy(self):
def check_simple_fc_parallel_accuracy(self,
balance_parameter_opt_between_cards):
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
......@@ -307,7 +319,9 @@ class TestMNIST(TestParallelExecutorBase):
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=True)
use_parallel_executor=True,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
......@@ -315,18 +329,28 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy()
self.check_simple_fc_parallel_accuracy(False)
def check_batchnorm_fc_convergence(self):
def test_simple_fc_parallel_accuracy_with_new_strategy(self):
self.check_simple_fc_parallel_accuracy(True)
def check_batchnorm_fc_convergence(self,
balance_parameter_opt_between_cards):
self.check_network_convergence(fc_with_batchnorm)
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm, feed_dict={"image": img,
"label": label})
fc_with_batchnorm,
feed_dict={"image": img,
"label": label},
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence()
self.check_batchnorm_fc_convergence(False)
def test_batchnorm_fc_with_new_strategy(self):
self.check_batchnorm_fc_convergence(True)
class TestResnet(TestParallelExecutorBase):
......@@ -348,17 +372,22 @@ class TestResnet(TestParallelExecutorBase):
# fluid.recordio_writer.convert_reader_to_recordio_file(
# "./flowers.recordio", reader, feeder, compressor=fluid.core.RecordIOWriter.Compressor.NoCompress)
def check_resnet_convergence(self):
def check_resnet_convergence(self, balance_parameter_opt_between_cards):
import functools
batch_size = 2
self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=20,
batch_size=batch_size)
batch_size=batch_size,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_resnet(self):
self.check_resnet_convergence()
self.check_resnet_convergence(False)
def test_resnet_with_new_strategy(self):
self.check_resnet_convergence(True)
class ModelHyperParams(object):
......@@ -519,7 +548,7 @@ class TestTransformer(TestParallelExecutorBase):
class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def check_network_convergence(self):
def check_network_convergence(self, balance_parameter_opt_between_cards):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -539,12 +568,18 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
feed_dict = {'image': image, 'label': label}
train_exe = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=main)
use_cuda=True,
loss_name=loss.name,
main_program=main,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
test_exe = fluid.ParallelExecutor(
use_cuda=True,
main_program=test_program,
share_vars_from=train_exe)
share_vars_from=train_exe,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
for i in xrange(5):
test_loss, = test_exe.run([loss.name], feed=feed_dict)
......@@ -558,8 +593,11 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
"Train loss: " + str(train_loss) + "\n Test loss:" +
str(test_loss))
def test_parallel(self):
self.check_network_convergence()
def test_parallel_testing(self):
self.check_network_convergence(False)
def test_parallel_testing_with_new_strategy(self):
self.check_network_convergence(True)
import paddle.dataset.conll05 as conll05
......@@ -579,7 +617,7 @@ embedding_name = 'emb'
def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
is_sparse, **ignored):
is_sparse, balance_parameter_opt_between_cards, **ignored):
# 8 features
predicate_embedding = fluid.layers.embedding(
input=predicate,
......@@ -648,7 +686,9 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
class TestCRFModel(unittest.TestCase):
def check_network_convergence(self, is_sparse):
def check_network_convergence(self,
is_sparse,
balance_parameter_opt_between_cards=False):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -696,7 +736,11 @@ class TestCRFModel(unittest.TestCase):
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name)
pe = fluid.ParallelExecutor(
use_cuda=True,
loss_name=avg_cost.name,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
feeder = fluid.DataFeeder(
feed_list=[
......@@ -718,6 +762,14 @@ class TestCRFModel(unittest.TestCase):
def test_update_dense_parameter(self):
self.check_network_convergence(is_sparse=False)
def test_update_sparse_parameter_with_new_strategy(self):
self.check_network_convergence(
is_sparse=False, balance_parameter_opt_between_cards=True)
def test_update_dense_parameter_with_new_strategy(self):
self.check_network_convergence(
is_sparse=False, balance_parameter_opt_between_cards=True)
# test fetch all the variables of global_block
......@@ -796,5 +848,42 @@ class TestFetchOp(unittest.TestCase):
self.parallel_exe(train_inputs, seed=1)
class TestFeedParallel(unittest.TestCase):
def test_main(self):
main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1
with fluid.scope_guard(fluid.core.Scope()):
with fluid.program_guard(main, startup):
data = fluid.layers.data(
name='image', shape=[3, 224, 224], dtype='float32')
label = fluid.layers.data(
name='label', shape=[1], dtype='int64')
out = Lenet(data, class_dim=102)
loss = fluid.layers.cross_entropy(input=out, label=label)
loss = fluid.layers.mean(loss)
opt = fluid.optimizer.Momentum(
learning_rate=0.1,
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
opt.minimize(loss)
place = fluid.CUDAPlace(0)
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(
paddle.batch(
flowers.train(), batch_size=16), multi_devices=True)
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=main)
for batch_id, data in enumerate(reader()):
loss_np = np.array(pe.run(feed=data, fetch_list=[loss.name])[0])
print batch_id, loss_np
if batch_id == 2:
break
if __name__ == '__main__':
unittest.main()
......@@ -19,7 +19,7 @@ import executor
import data_feeder
import contextlib
import io
import transpiler
import unique_name
# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module
import optimizer as opt_module
......@@ -56,26 +56,62 @@ class EndStepEvent(object):
self.step = step_id
def check_and_get_place(place):
"""
Check the type of place or get the default place
Args:
place(None|core.CUDAPlace|core.CPUPlace): the place that trainer will be executed on.
Raises:
TypeError if the type mismatched.
Returns:
the original place if it is not None.
if fluid is compiled with CUDA, returns CUDAPlace(0) by default.
Otherwise returns CPUPlace by default.
"""
if place is None:
if core.is_compiled_with_cuda():
return core.CUDAPlace(0)
else:
return core.CPUPlace()
else:
if not isinstance(place, core.CUDAPlace) and not isinstance(
place, core.CPUPlace):
raise TypeError("Place should be either CUDAPlace or CPUPlace")
return place
class Trainer(object):
"""
Args:
program_func(callable): A function which will return loss. The loss must be a scaler.
train_func(callable): A function which will return loss. The loss must be a scalar.
infer_func(callable): A function which will return predict, used to save inference model
optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer
place: The device place of this trainer.
"""
def __init__(self, program_func, optimizer, param_path=None, place=None):
def __init__(self,
train_func,
infer_func,
optimizer,
param_path=None,
place=None):
# 1. we need to generate a framework.Program by calling
# program_func. Reference: fluid.program_guard in
# test_word2vec.py
if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError("The optimizer should be an instance of Optimizer")
self.infer_func = infer_func
self.scope = core.Scope()
self.startup_program = framework.Program()
self.train_program = framework.Program()
with framework.program_guard(self.train_program, self.startup_program):
program_func_outs = program_func()
program_func_outs = train_func()
self.test_outputs = program_func_outs if isinstance(
program_func_outs, list) else [program_func_outs]
self.test_program = self.train_program.clone()
......@@ -86,9 +122,9 @@ class Trainer(object):
loss = self.test_outputs[0]
optimize_ops, params_grads = optimizer.minimize(loss)
self.place = Trainer._check_and_get_place(place)
self.place = check_and_get_place(place)
self.dist_transpile_if_necessary(optimize_ops, params_grads)
self._dist_transpile_if_necessary(optimize_ops, params_grads)
# 2. move the default_main_program to self.program and run the
# default_startup program on an empty core.Scope()
......@@ -101,7 +137,7 @@ class Trainer(object):
# load params from param_path into scope
io.load_persistables(exe, dirname=param_path)
def dist_transpile_if_necessary(self, optimize_ops, params_grads):
def _dist_transpile_if_necessary(self, optimize_ops, params_grads):
if "PADDLE_TRAINING_ROLE" not in os.environ:
return
......@@ -190,31 +226,14 @@ class Trainer(object):
exe = executor.Executor(self.place)
io.save_persistables(exe, dirname=param_path)
@staticmethod
def _check_and_get_place(place):
"""
Check the type of place or get the default place
Args:
place(None|core.CUDAPlace|core.CPUPlace): the place that trainer will be executed on.
Raises:
TypeError if the type mismatched.
Returns:
the original place if it is not None.
if fluid is compiled with CUDA, returns CUDAPlace(0) by default.
Otherwise returns CPUPlace by default.
"""
if place is None:
if core.is_compiled_with_cuda():
return core.CUDAPlace(0)
else:
return core.CPUPlace()
else:
if not isinstance(place, core.CUDAPlace) and not isinstance(
place, core.CPUPlace):
raise TypeError("Place should be either CUDAPlace or CPUPlace")
return place
def save_inference_model(self, model_path):
inference_program = framework.Program()
with framework.program_guard(inference_program):
with unique_name.guard():
predict_var = self.infer_func()
predict_var = self.train_program.block(0).var(predict_var.name)
exe = executor.Executor(self.place)
io.save_inference_model(model_path, [], [predict_var], exe)
@contextlib.contextmanager
def _prog_and_scope_guard(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册