提交 72618c8d 编写于 作者: Q Qiao Longfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into gru-add-mode

......@@ -184,7 +184,7 @@ endif()
target_link_libraries(executor garbage_collector)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor
graph build_strategy
fast_threaded_ssa_graph_executor variable_helper)
......
......@@ -77,6 +77,8 @@ cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUT
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context)
cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor)
cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
......
......@@ -19,6 +19,13 @@
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/profiler.h"
// asynchronous nccl allreduce or synchronous issue:
// https://github.com/PaddlePaddle/Paddle/issues/15049
DEFINE_bool(
sync_nccl_allreduce, false,
"If set true, will call `cudaStreamSynchronize(nccl_stream)`"
"after allreduce, this mode can get better performance in some scenarios.");
namespace paddle {
namespace framework {
namespace details {
......@@ -48,100 +55,104 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
// FIXME(typhoonzero): If scope0(global scope) have NCCL_ID_VAR,
// this is a distributed or inter-process call, find a better way.
WaitInputVarGenerated();
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
PADDLE_ENFORCE_EQ(
in_var_handles.size(), places_.size(),
"The NoDummyInputSize should be equal to the number of places.");
PADDLE_ENFORCE_EQ(
in_var_handles.size(), out_var_handles.size(),
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
std::vector<const LoDTensor *> lod_tensors;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto *s = local_scopes_[i];
auto &local_scope = *s->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &lod_tensor =
local_scope.FindVar(in_var_handles[i]->name_)->Get<LoDTensor>();
lod_tensors.emplace_back(&lod_tensor);
PADDLE_ENFORCE_EQ(in_var_handles[i]->name_, out_var_handles[i]->name_,
"The name of input and output should be equal.");
}
if (platform::is_gpu_place(lod_tensors[0]->place())) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
if (NoDummyInputSize() == 1 &&
local_scopes_[0]->FindLocalVar(NCCL_ID_VARNAME) == nullptr) {
#else
if (NoDummyInputSize() == 1) {
#endif
return; // No need to all reduce when GPU count = 1;
} else {
// Wait input done
WaitInputVarGenerated();
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
PADDLE_ENFORCE_EQ(
in_var_handles.size(), places_.size(),
"The NoDummyInputSize should be equal to the number of places.");
PADDLE_ENFORCE_EQ(
in_var_handles.size(), out_var_handles.size(),
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
std::vector<const LoDTensor *> lod_tensors;
PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
int dtype = -1;
size_t numel = 0;
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto *s = local_scopes_[i];
auto &local_scope = *s->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &lod_tensor =
local_scope.FindVar(in_var_handles[i]->name_)->Get<LoDTensor>();
lod_tensors.emplace_back(&lod_tensor);
PADDLE_ENFORCE_EQ(in_var_handles[i]->name_, out_var_handles[i]->name_,
"The name of input and output should be equal.");
}
auto &p = places_[i];
auto &lod_tensor = *lod_tensors[i];
void *buffer = const_cast<void *>(lod_tensor.data<void>());
if (platform::is_gpu_place(lod_tensors[0]->place())) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
int dtype = -1;
size_t numel = 0;
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &p = places_[i];
auto &lod_tensor = *lod_tensors[i];
void *buffer = const_cast<void *>(lod_tensor.data<void>());
if (dtype == -1) {
dtype = platform::ToNCCLDataType(lod_tensor.type());
}
if (dtype == -1) {
dtype = platform::ToNCCLDataType(lod_tensor.type());
}
if (numel == 0) {
numel = static_cast<size_t>(lod_tensor.numel());
}
if (numel == 0) {
numel = static_cast<size_t>(lod_tensor.numel());
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
comm, stream));
});
}
this->RunAndRecordEvent([&] {
if (all_reduce_calls.size() == 1UL) {
// Do not use NCCLGroup when manage NCCL by per thread per device
all_reduce_calls[0]();
} else {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
}
});
if (FLAGS_sync_nccl_allreduce) {
for (auto &p : places_) {
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype),
ncclSum, comm, stream));
});
cudaStreamSynchronize(stream);
}
this->RunAndRecordEvent([&] {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
});
}
#else
PADDLE_THROW("Not compiled with CUDA");
PADDLE_THROW("Not compiled with CUDA");
#endif
} else { // Special handle CPU only Operator's gradient. Like CRF
auto &trg = *this->local_scopes_[0]
->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(out_var_handles[0]->name_)
->GetMutable<framework::LoDTensor>();
// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(lod_tensors[0]->type(), func);
for (size_t i = 1; i < local_scopes_.size(); ++i) {
auto &scope =
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &p = places_[i];
auto *var = scope.FindVar(out_var_handles[i]->name_);
auto *dev_ctx = dev_ctxes_.at(p);
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>();
auto &tensor_cpu = trg;
TensorCopy(tensor_cpu, p, *dev_ctx, &tensor_gpu);
});
}
} else { // Special handle CPU only Operator's gradient. Like CRF
auto &trg = *this->local_scopes_[0]
->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(out_var_handles[0]->name_)
->GetMutable<framework::LoDTensor>();
// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(lod_tensors[0]->type(), func);
for (size_t i = 1; i < local_scopes_.size(); ++i) {
auto &scope =
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &p = places_[i];
auto *var = scope.FindVar(out_var_handles[i]->name_);
auto *dev_ctx = dev_ctxes_.at(p);
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>();
auto &tensor_cpu = trg;
TensorCopy(tensor_cpu, p, *dev_ctx, &tensor_gpu);
});
}
}
}
......
......@@ -31,7 +31,11 @@ namespace framework {
namespace details {
static inline bool SeqOnlyAllReduceOps(const BuildStrategy &strategy) {
return (!strategy.enable_sequential_execution_ && strategy.num_trainers_ > 1);
// Should fix the allreduce op order if scheduling
// them in multiple threads or processes to avoid hang.
return (!strategy.enable_sequential_execution_ &&
strategy.num_trainers_ > 1) ||
strategy.enable_parallel_graph_;
}
class ParallelExecutorPassBuilder : public ir::PassBuilder {
......@@ -86,8 +90,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
auto multi_devices_pass = AppendPass("multi_devices_pass");
multi_devices_pass->SetNotOwned<const BuildStrategy>("strategy",
&strategy_);
multi_devices_pass->Set<int>("num_trainers",
new int(strategy_.num_trainers_));
// Add a graph print pass to record a graph with device info.
if (!strategy_.debug_graphviz_path_.empty()) {
......@@ -132,6 +134,7 @@ std::shared_ptr<ir::PassBuilder> BuildStrategy::CreatePassesFromStrategy(
std::unique_ptr<ir::Graph> BuildStrategy::Apply(
const ProgramDesc &main_program, const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::vector<Scope *> &local_scopes,
const size_t &nranks,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const {
#else
......@@ -150,6 +153,9 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
pass->Erase("local_scopes");
pass->SetNotOwned<const std::vector<Scope *>>("local_scopes",
&local_scopes);
pass->Erase("nranks");
pass->Set<size_t>("nranks", new size_t(nranks));
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr;
pass->Erase("nccl_ctxs");
......
......@@ -110,6 +110,7 @@ struct BuildStrategy {
const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::vector<Scope *> &local_scopes,
const size_t &nranks,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
const bool use_cuda,
platform::NCCLContextMap *nccl_ctxs) const;
......@@ -117,6 +118,13 @@ struct BuildStrategy {
const bool use_cuda) const;
#endif
// If set true, ParallelExecutor would build the main_program into multiple
// graphs,
// each of the graphs would run with one device. This approach can achieve
// better performance
// on some scenarios.
mutable bool enable_parallel_graph_ = false;
private:
mutable bool is_finalized_ = false;
mutable std::shared_ptr<ir::PassBuilder> pass_builder_;
......
......@@ -138,7 +138,7 @@ static const char kLossVarName[] = "loss_var_name";
static const char kPlaces[] = "places";
static const char kLocalScopes[] = "local_scopes";
static const char kStrategy[] = "strategy";
static const char kNumTrainers[] = "num_trainers";
static const char kNRanks[] = "nranks";
void MultiDevSSAGraphBuilder::Init() const {
all_vars_.clear();
......@@ -174,7 +174,7 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
auto nodes = graph->ReleaseNodes();
ir::Graph &result = *graph;
int num_trainers = Get<int>(kNumTrainers);
size_t nranks = Get<size_t>(kNRanks);
for (auto &node : nodes) {
if (node->IsVar() && node->Var()) {
......@@ -251,7 +251,7 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
CreateComputationalOps(&result, node, places_.size());
}
if (!is_forwarding && (places_.size() > 1 || num_trainers > 1)) {
if (!is_forwarding && nranks > 1UL) {
bool is_bk_op =
static_cast<bool>(boost::get<int>(node->Op()->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) &
......@@ -649,12 +649,13 @@ int MultiDevSSAGraphBuilder::GetVarDeviceID(
void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(
ir::Graph *result, const std::string &loss_grad_name,
ir::Node *out_var_node, proto::VarType::Type dtype) const {
size_t nranks = Get<size_t>("nranks");
for (size_t i = 0; i < places_.size(); ++i) {
// Insert ScaleCost OpHandle
auto *dev_ctx = platform::DeviceContextPool::Instance().Get(places_[i]);
auto *op_handle = new ScaleLossGradOpHandle(
result->CreateEmptyNode("scale_loss_grad", ir::Node::Type::kOperation),
local_scopes_.size(), local_scopes_[i], places_[i], dev_ctx, dtype);
nranks, local_scopes_[i], places_[i], dev_ctx, dtype);
result->Get<GraphOps>(kGraphOps).emplace_back(op_handle);
// FIXME: Currently ScaleLossGradOp only use device_count as scale
......@@ -887,4 +888,4 @@ REGISTER_PASS(multi_devices_pass,
.RequirePassAttr(paddle::framework::details::kPlaces)
.RequirePassAttr(paddle::framework::details::kLocalScopes)
.RequirePassAttr(paddle::framework::details::kStrategy)
.RequirePassAttr(paddle::framework::details::kNumTrainers);
.RequirePassAttr(paddle::framework::details::kNRanks);
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
namespace paddle {
namespace framework {
namespace details {
ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> &&graphs)
: strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr),
places_(std::move(places)),
graphs_(std::move(graphs)) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
// set the correct size of thread pool to each device.
strategy_.num_threads_ = strategy_.num_threads_ < places_.size()
? 1UL
: strategy_.num_threads_ / places_.size();
VLOG(1) << "set num_threads: " << strategy_.num_threads_
<< " to run the operators of the graph on each device.";
for (size_t i = 0; i < places.size(); ++i) {
executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, {local_scopes_[i]}, {places_[i]}, std::move(graphs_[i])));
}
}
FeedFetchList ParallelSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::vector<std::future<FeedFetchList>> run_futures;
std::vector<FeedFetchList> fetch_data;
FeedFetchList ret;
fetch_data.reserve(places_.size());
ret.reserve(fetch_tensors.size());
exception_holder_.Clear();
for (size_t i = 0; i < places_.size(); ++i) {
auto call = [this, i, &fetch_tensors]() -> FeedFetchList {
try {
return executors_[i]->Run(fetch_tensors);
} catch (...) {
exception_holder_.Catch(std::current_exception());
}
return FeedFetchList();
};
if (pool_) {
run_futures.emplace_back(pool_->enqueue(std::move(call)));
} else {
fetch_data.emplace_back(std::move(call()));
}
}
if (pool_) {
for (auto &f : run_futures) {
if (exception_holder_.IsCaught()) {
f.wait();
} else {
fetch_data.emplace_back(std::move(f.get()));
}
}
}
if (exception_holder_.IsCaught()) {
exception_holder_.ReThrow();
}
for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) {
std::vector<const LoDTensor *> lodtensor_ptrs;
lodtensor_ptrs.reserve(local_scopes_.size());
for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) {
lodtensor_ptrs.push_back(&fetch_data.at(scope_idx).at(fetch_idx));
}
ret.emplace_back();
ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace());
}
return ret;
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
namespace paddle {
namespace framework {
namespace details {
class ParallelSSAGraphExecutor : public SSAGraphExecutor {
public:
ParallelSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> &&graphs);
~ParallelSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; }
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
private:
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
std::vector<platform::Place> places_;
std::vector<std::unique_ptr<ir::Graph>> graphs_;
std::vector<std::unique_ptr<details::ThreadedSSAGraphExecutor>> executors_;
ExceptionHolder exception_holder_;
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -56,7 +56,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
}
}
std::vector<framework::LoDTensor> fetch_data;
std::exception_ptr eptr;
std::exception_ptr eptr = nullptr;
try {
fetch_data = underlying_executor_->Run(fetch_tensors);
} catch (...) {
......
......@@ -21,12 +21,9 @@ limitations under the License. */
#include "paddle/fluid/framework/ir/graph.h"
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
#include "paddle/fluid/platform/nccl_helper.h"
#endif
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
......@@ -38,6 +35,8 @@ limitations under the License. */
DEFINE_string(pe_profile_fname, "",
"Profiler filename for PE, which generated by gperftools."
"Only valid when compiled `WITH_PRIFILER=ON`. Empty if disable.");
DEFINE_bool(enable_parallel_graph, false,
"Force disable parallel graph execution mode if set false.");
namespace paddle {
namespace framework {
......@@ -106,6 +105,7 @@ class ParallelExecutorPrivate {
bool own_local_scope_;
bool use_cuda_;
bool use_all_reduce_;
size_t nranks_;
// global_ref_cnts_ is only initialized when ParallelExecutor constructs, and
// then keeps unchanged
......@@ -201,6 +201,7 @@ ParallelExecutor::ParallelExecutor(
member_->build_strategy_ = build_strategy;
member_->use_all_reduce_ =
build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;
member_->nranks_ = num_trainers * places.size();
if (!member_->use_all_reduce_) {
PADDLE_ENFORCE(places.size() > 1,
......@@ -224,62 +225,98 @@ ParallelExecutor::ParallelExecutor(
}
}
// FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy.
build_strategy.enable_parallel_graph_ =
EnableParallelGraphExecution(main_program, exec_strategy, build_strategy);
VLOG(1) << "Enable ParallelGraph Execution: "
<< build_strategy.enable_parallel_graph_;
if (member_->use_cuda_) {
// Bcast Parameters to all GPUs
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
ncclUniqueId *nccl_id = nullptr;
// gen_nccl_id operator can broadcast the ncclUniqueId for nccl2 collective
// distributed training
auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
}
if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) {
if (nccl_id == nullptr) {
local_nccl_id_.reset(new ncclUniqueId());
platform::dynload::ncclGetUniqueId(local_nccl_id_.get());
nccl_id = local_nccl_id_.get();
}
}
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
member_->places_, nccl_id, num_trainers, trainer_id));
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevices(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
std::vector<std::unique_ptr<ir::Graph>> graphs;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
if (build_strategy.enable_parallel_graph_) {
for (size_t i = 0; i < member_->places_.size(); ++i) {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_,
member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
} else {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
#else
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, member_->local_scopes_,
member_->use_cuda_, member_->nccl_ctxs_.get());
#else
std::unique_ptr<ir::Graph> graph =
build_strategy.Apply(main_program, member_->places_, loss_var_name,
member_->local_scopes_, member_->use_cuda_);
member_->nranks_, member_->use_cuda_);
graphs.push_back(std::move(graph));
#endif
auto max_memory_size = GetEagerDeletionThreshold();
if (max_memory_size >= 0) {
graph = member_->PrepareGCAndRefCnts(std::move(graph),
static_cast<size_t>(max_memory_size));
for (size_t i = 0; i < graphs.size(); ++i) {
graphs[i] = member_->PrepareGCAndRefCnts(
std::move(graphs[i]), static_cast<size_t>(max_memory_size));
}
}
// Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars
std::vector<details::VariableInfo> var_infos;
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
for (auto &graph : graphs) {
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
}
}
}
// If the loss_var_name is given, the number of graph should be only one.
if (loss_var_name.size()) {
size_t graph_num = ir::GraphNum(*graph);
size_t graph_num = ir::GraphNum(*graphs[0]);
if (graph_num > 1) {
LOG(WARNING)
<< "The number of graph should be only one, "
"but the current graph has "
<< ir::GraphNum(*graph)
<< ir::GraphNum(*graphs[0])
<< " sub_graphs. If you want to see the nodes of the "
"sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
"to specify the output dir. NOTES: if you not do training, "
......@@ -287,14 +324,20 @@ ParallelExecutor::ParallelExecutor(
}
}
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
if (build_strategy.enable_parallel_graph_) {
member_->executor_.reset(new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph)));
std::move(graphs)));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph)));
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graphs[0])));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graphs[0])));
}
}
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
......@@ -423,6 +466,36 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
}
}
bool ParallelExecutor::EnableParallelGraphExecution(
const ProgramDesc &main_program, const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const {
if (!FLAGS_enable_parallel_graph) return false;
bool enable_parallel_graph = true;
// TODO(Yancey1989): support sparse update in ParallelGraph mode.
for (auto &var_desc : main_program.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::SELECTED_ROWS) {
enable_parallel_graph = false;
}
}
// TODO(Yancey1989): support pserver mode
for (auto &op_desc : main_program.Block(0).AllOps()) {
if (op_desc->Type() == "send" || op_desc->Type() == "recv") {
enable_parallel_graph = false;
break;
}
}
if (!member_->use_all_reduce_ || !member_->use_cuda_)
enable_parallel_graph = false;
if (build_strategy.enable_sequential_execution_ ||
exec_strategy.type_ == ExecutionStrategy::ExecutorType::kExperimental)
enable_parallel_graph = false;
return enable_parallel_graph;
}
ParallelExecutor::~ParallelExecutor() {
for (auto &p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
......
......@@ -28,6 +28,10 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace paddle {
namespace framework {
......@@ -68,8 +72,14 @@ class ParallelExecutor {
private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
bool EnableParallelGraphExecution(const ProgramDesc &main_program,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const;
ParallelExecutorPrivate *member_;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
std::unique_ptr<ncclUniqueId> local_nccl_id_;
#endif
};
} // namespace framework
......
......@@ -89,7 +89,6 @@ void ThreadPool::TaskLoop() {
task = std::move(tasks_.front());
tasks_.pop();
}
// run the task
task();
}
......
......@@ -12,6 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License. */
#include <unordered_map>
#include "paddle/fluid/framework/data_layout_transform.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/operators/conv_op.h"
......@@ -68,13 +69,22 @@ inline mkldnn::memory::format GetWeightsFormat(mkldnn::memory::format format,
}
}
template <typename T>
template <typename T, typename K>
class ConvMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
public:
void Compute(const paddle::framework::ExecutionContext& ctx) const override {
PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()),
"It must use CPUPlace.");
bool is_INT8 =
std::is_same<T, int8_t>::value || std::is_same<T, uint8_t>::value;
if (!is_INT8) {
ComputeFP32(ctx);
} else {
ComputeINT8(ctx);
}
}
void ComputeFP32(const paddle::framework::ExecutionContext& ctx) const {
const bool is_test = ctx.Attr<bool>("is_test");
auto& dev_ctx =
......@@ -274,6 +284,257 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
output->set_layout(DataLayout::kMKLDNN);
output->set_format(GetMKLDNNFormat(*dst_memory_p));
}
void ComputeINT8(const paddle::framework::ExecutionContext& ctx) const {
const bool is_test = ctx.Attr<bool>("is_test");
auto& dev_ctx =
ctx.template device_context<paddle::platform::MKLDNNDeviceContext>();
const auto& mkldnn_engine = dev_ctx.GetEngine();
auto* input = ctx.Input<Tensor>("Input");
auto* filter = ctx.Input<Tensor>("Filter");
auto* bias = ctx.HasInput("Bias") ? ctx.Input<Tensor>("Bias") : nullptr;
auto* output = ctx.Output<Tensor>("Output");
PADDLE_ENFORCE(input->layout() == DataLayout::kMKLDNN &&
input->format() != memory::format::format_undef,
"Wrong layout/format set for Input tensor");
PADDLE_ENFORCE(filter->layout() == DataLayout::kMKLDNN &&
filter->format() != memory::format::format_undef,
"Wrong layout/format set for Filter tensor");
PADDLE_ENFORCE(input->dims().size() == 4 || input->dims().size() == 5,
"Input must be with 4 or 5 dimensions, i.e. NCHW or NCDHW");
PADDLE_ENFORCE(filter->dims().size() == 4 || filter->dims().size() == 5,
"Filter must be with 4 or 5 dimensions, i.e. OIHW or OIDHW");
if (bias) {
PADDLE_ENFORCE(bias->layout() == DataLayout::kMKLDNN &&
bias->format() != memory::format::format_undef,
"Wrong layout/format set for Bias tensor");
PADDLE_ENFORCE(bias->dims().size() == 1,
"Bias must only have 1 dimension, i.e. X");
}
std::vector<int> strides = ctx.Attr<std::vector<int>>("strides");
std::vector<int> paddings = ctx.Attr<std::vector<int>>("paddings");
std::vector<int> dilations = ctx.Attr<std::vector<int>>("dilations");
int groups = ctx.Attr<int>("groups");
bool force_fp32_output = ctx.Attr<bool>("force_fp32_output");
bool is_conv3d = strides.size() == 3U;
// TODO(tpatejko): add support for dilation
PADDLE_ENFORCE(
is_conv3d
? dilations.size() == 3 && dilations[0] == 1 && dilations[1] == 1 &&
dilations[2] == 1
: dilations.size() == 2 && dilations[0] == 1 && dilations[1] == 1,
"dilation in convolution is not implemented yet");
PADDLE_ENFORCE(is_conv3d != true, "int8 does not support conv3d currently");
const T* input_data = input->data<T>();
std::vector<int> src_tz = paddle::framework::vectorize2int(input->dims());
std::vector<int> weights_tz =
paddle::framework::vectorize2int(filter->dims());
int g = std::max(groups, 1);
GetWeightsTz(weights_tz, g, is_conv3d);
std::vector<int> dst_tz = paddle::framework::vectorize2int(output->dims());
// Get unique name for storing MKLDNN primitives
std::string key;
key.reserve(MaxKeyLength);
mkldnn::memory::data_type src_dt =
paddle::framework::ToMKLDNNDataType(input->type());
platform::ConvMKLDNNHandler::AppendKey(
&key, src_tz, weights_tz, strides, paddings, dilations, groups, src_dt,
input->format(), ctx.op().Output("Output"));
const std::string key_conv_pd = key + "@conv_pd";
std::shared_ptr<mkldnn::convolution_forward> conv_p = nullptr;
std::shared_ptr<mkldnn::memory> src_memory_p = nullptr;
std::shared_ptr<mkldnn::memory> user_src_memory_p = nullptr;
std::shared_ptr<mkldnn::memory> dst_memory_p = nullptr;
std::vector<primitive> pipeline;
std::shared_ptr<mkldnn::convolution_forward::primitive_desc> conv_pd =
nullptr;
std::shared_ptr<platform::ConvMKLDNNHandler> handler = nullptr;
auto prim_key = key + "@conv_p";
auto dst_key = key + "@dst_mem_p";
auto src_key = key + "@src_mem_p";
auto user_src_key = key + "@user_src_mem_p";
auto src_reorder_key = key + "@src_mem_preorder_p";
conv_p = std::static_pointer_cast<mkldnn::convolution_forward>(
dev_ctx.GetBlob(prim_key));
if (conv_p == nullptr || !is_test) {
const K* filter_data = filter->data<K>();
auto scale_in_data = ctx.Attr<float>("Scale_in");
auto scale_weights_data = ctx.Attr<std::vector<float>>("Scale_weights");
auto scale_out_data =
force_fp32_output ? 1.0f : ctx.Attr<float>("Scale_out");
bool is_multi_channel = scale_weights_data.size() > 1;
int count = is_multi_channel ? (g > 1 ? (weights_tz)[1] * (weights_tz)[0]
: (weights_tz)[0])
: 1;
std::vector<float> output_shift_scale(count);
#pragma omp parallel for if (count > 1)
for (int i = 0; i < count; i++) {
if (scale_weights_data[i] == 0.0)
output_shift_scale[i] =
scale_out_data; // weights data will contain 0
// in some models, then weights
// scale couldn't be calculated
else
output_shift_scale[i] =
scale_out_data / (scale_in_data * scale_weights_data[i]);
}
auto user_src_md =
platform::MKLDNNMemDesc({src_tz}, src_dt, input->format());
auto user_weights_md = platform::MKLDNNMemDesc(
{weights_tz}, platform::MKLDNNGetDataType<K>(),
((g) == 1) ? mkldnn::memory::format::oihw
: mkldnn::memory::format::goihw);
/* create memory descriptor for convolution without specified format
* ('any') which lets a primitive (convolution in this case) choose
* the memory format preferred for best performance
*/
std::string data_format = ctx.Attr<std::string>("data_format");
auto chosen_memory_format =
platform::data_format_to_memory_format(data_format);
std::vector<int> bias_tz;
auto src_md =
platform::MKLDNNMemDesc(src_tz, src_dt, chosen_memory_format);
auto weights_md = platform::MKLDNNMemDesc(
weights_tz, memory::data_type::s8, chosen_memory_format);
auto dst_dt = force_fp32_output
? paddle::framework::ToMKLDNNDataType(
framework::DataTypeTrait<float>::DataType)
: paddle::framework::ToMKLDNNDataType(
framework::DataTypeTrait<int8_t>::DataType);
auto dst_md =
platform::MKLDNNMemDesc(dst_tz, dst_dt, chosen_memory_format);
// create a conv primitive descriptor and save it for usage in backward
if (bias) {
bias_tz = paddle::framework::vectorize2int(bias->dims());
auto bias_md = platform::MKLDNNMemDesc(bias_tz, memory::data_type::s32,
memory::format::x);
conv_pd = ConvFwdPrimitiveDesc(src_md, weights_md, bias_md, dst_md,
strides, paddings, mkldnn_engine,
output_shift_scale, is_test);
} else {
conv_pd =
ConvFwdPrimitiveDesc(src_md, weights_md, dst_md, strides, paddings,
mkldnn_engine, output_shift_scale, is_test);
}
// Save conv_pd/src_memory/weights_memory for backward pass
dev_ctx.SetBlob(key_conv_pd, conv_pd);
handler.reset(new platform::ConvMKLDNNHandler(conv_pd, dev_ctx,
mkldnn_engine, key));
// create mkldnn memory from input tensors (data/weights)
user_src_memory_p =
handler->AcquireSrcMemory(user_src_md, to_void_cast<T>(input_data));
auto user_weights_memory_p = handler->AcquireWeightsMemory(
user_weights_md, to_void_cast<K>(filter_data));
// create reorder primitive if the input format is not the preferred one
src_memory_p =
handler->AcquireSrcMemoryFromPrimitive(user_src_memory_p, pipeline);
std::shared_ptr<mkldnn::memory> weights_memory_p;
int mask_reorder =
is_multi_channel ? ((g != 1) ? (1 << 1) + (1 << 0) : 1 << 0) : 0;
weights_memory_p = handler->AcquireWeightsMemoryFromPrimitive(
user_weights_memory_p, pipeline, is_test, true, scale_weights_data,
mask_reorder);
if (!force_fp32_output) {
dst_memory_p = platform::SetDstMemory<int8_t>(ctx, output, handler);
} else {
dst_memory_p = platform::SetDstMemory<float>(ctx, output, handler);
}
// create convolution op primitive
auto scale_bias_key = key + "@scale_bias";
if (bias) {
const float* bias_data = bias->data<float>();
auto user_bias_md = platform::MKLDNNMemDesc(
{bias_tz}, platform::MKLDNNGetDataType<float>(), memory::format::x);
auto user_bias_memory_p = handler->AcquireBiasMemory(
user_bias_md, to_void_cast<float>(bias_data));
std::shared_ptr<mkldnn::memory> bias_memory_p;
int mask_reorder = is_multi_channel ? 1 << 0 : 1;
int count =
is_multi_channel
? (g > 1 ? (weights_tz)[1] * (weights_tz)[0] : (weights_tz)[0])
: 1;
std::vector<float> scale_bias_data(count);
#pragma omp parallel for if (count > 1)
for (int i = 0; i < count; i++) {
scale_bias_data[i] = scale_in_data * scale_weights_data[i];
}
bias_memory_p = handler->AcquireBiasMemoryFromPrimitive(
user_bias_memory_p, pipeline, is_test, true, scale_bias_data,
mask_reorder);
conv_p = handler->AcquireConvolution(src_memory_p, weights_memory_p,
bias_memory_p, dst_memory_p);
} else {
conv_p = handler->AcquireConvolution(src_memory_p, weights_memory_p,
dst_memory_p);
}
// push primitive to stream and wait until it's executed
pipeline.push_back(*conv_p);
} else {
auto src_memory_reorder_p = std::static_pointer_cast<mkldnn::memory>(
dev_ctx.GetBlob(src_reorder_key));
src_memory_p =
std::static_pointer_cast<mkldnn::memory>(dev_ctx.GetBlob(src_key));
if (src_memory_reorder_p) {
user_src_memory_p = std::static_pointer_cast<mkldnn::memory>(
dev_ctx.GetBlob(user_src_key));
user_src_memory_p->set_data_handle(to_void_cast<T>(input_data));
} else if (src_memory_p) {
src_memory_p->set_data_handle(to_void_cast<T>(input_data));
}
dst_memory_p =
std::static_pointer_cast<mkldnn::memory>(dev_ctx.GetBlob(dst_key));
conv_pd =
std::static_pointer_cast<mkldnn::convolution_forward::primitive_desc>(
dev_ctx.GetBlob(key_conv_pd));
if (conv_pd) {
handler.reset(new platform::ConvMKLDNNHandler(conv_pd, dev_ctx,
mkldnn_engine, key));
}
if (!force_fp32_output) {
dst_memory_p =
platform::SetDstMemoryHandler<int8_t>(ctx, output, handler);
} else {
dst_memory_p =
platform::SetDstMemoryHandler<float>(ctx, output, handler);
}
if (src_memory_reorder_p) {
pipeline.push_back(*src_memory_reorder_p);
}
pipeline.push_back(*conv_p);
}
// push primitive to stream and wait until it's executed
stream(stream::kind::eager).submit(pipeline).wait();
output->set_layout(DataLayout::kMKLDNN);
output->set_format(GetMKLDNNFormat(*dst_memory_p));
}
private:
mkldnn::primitive_attr CreatePostOps(bool fuse_relu,
......@@ -301,6 +562,16 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
return conv_attr;
}
mkldnn::primitive_attr CreatePostOps(
const std::vector<float> output_shift_scale) const {
mkldnn::primitive_attr conv_attr;
mkldnn::post_ops post_operations;
int mask = output_shift_scale.size() > 1 ? 1 << 1 : 0;
conv_attr.set_output_scales(mask, output_shift_scale);
conv_attr.set_post_ops(post_operations);
return conv_attr;
}
std::unique_ptr<mkldnn::convolution_forward::primitive_desc>
ConvFwdPrimitiveDesc(const memory::desc& src, const memory::desc& weights,
const memory::desc& dst, const std::vector<int>& strides,
......@@ -325,6 +596,32 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
p_conv_pd);
}
std::unique_ptr<mkldnn::convolution_forward::primitive_desc>
ConvFwdPrimitiveDesc(const memory::desc& src, const memory::desc& weights,
const memory::desc& dst, const std::vector<int>& strides,
const std::vector<int>& paddings,
const mkldnn::engine& engine,
const std::vector<float> output_shift_scale,
bool is_test) const {
memory::dims stride_dims = {strides[0], strides[1]};
memory::dims padding_dims = {paddings[0], paddings[1]};
auto propagation = is_test ? mkldnn::prop_kind::forward_scoring
: mkldnn::prop_kind::forward_training;
auto conv_desc = mkldnn::convolution_forward::desc(
propagation, mkldnn::convolution_direct, src, weights, dst, stride_dims,
padding_dims, padding_dims, mkldnn::padding_kind::zero);
mkldnn::primitive_attr conv_attr = CreatePostOps(output_shift_scale);
auto p_conv_pd = new mkldnn::convolution_forward::primitive_desc(
conv_desc, conv_attr, engine);
return std::unique_ptr<mkldnn::convolution_forward::primitive_desc>(
p_conv_pd);
}
std::unique_ptr<mkldnn::convolution_forward::primitive_desc>
ConvFwdPrimitiveDesc(const memory::desc& src, const memory::desc& weights,
const memory::desc& bias, const memory::desc& dst,
......@@ -349,6 +646,33 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
return std::unique_ptr<mkldnn::convolution_forward::primitive_desc>(
p_conv_pd);
}
std::unique_ptr<mkldnn::convolution_forward::primitive_desc>
ConvFwdPrimitiveDesc(const memory::desc& src, const memory::desc& weights,
const memory::desc& bias, const memory::desc& dst,
const std::vector<int>& strides,
const std::vector<int>& paddings,
const mkldnn::engine& engine,
const std::vector<float> output_shift_scale,
bool is_test) const {
memory::dims stride_dims = {strides[0], strides[1]};
memory::dims padding_dims = {paddings[0], paddings[1]};
auto propagation = is_test ? mkldnn::prop_kind::forward_scoring
: mkldnn::prop_kind::forward_training;
auto conv_desc = mkldnn::convolution_forward::desc(
propagation, mkldnn::convolution_direct, src, weights, bias, dst,
stride_dims, padding_dims, padding_dims, mkldnn::padding_kind::zero);
mkldnn::primitive_attr conv_attr = CreatePostOps(output_shift_scale);
auto p_conv_pd = new mkldnn::convolution_forward::primitive_desc(
conv_desc, conv_attr, engine);
return std::unique_ptr<mkldnn::convolution_forward::primitive_desc>(
p_conv_pd);
}
};
template <typename T>
......@@ -555,7 +879,17 @@ namespace ops = paddle::operators;
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv2d, MKLDNN,
::paddle::platform::CPUPlace, FP32,
ops::kConvMKLDNNFP32,
ops::ConvMKLDNNOpKernel<float>);
ops::ConvMKLDNNOpKernel<float, float>);
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv2d, MKLDNN,
::paddle::platform::CPUPlace, U8,
ops::kConvMKLDNNFP32,
ops::ConvMKLDNNOpKernel<uint8_t, float>);
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv2d, MKLDNN,
::paddle::platform::CPUPlace, S8,
ops::kConvMKLDNNFP32,
ops::ConvMKLDNNOpKernel<int8_t, float>);
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv2d_grad, MKLDNN,
::paddle::platform::CPUPlace, FP32,
......@@ -565,7 +899,7 @@ REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv2d_grad, MKLDNN,
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv3d, MKLDNN,
::paddle::platform::CPUPlace, FP32,
ops::kConvMKLDNNFP32,
ops::ConvMKLDNNOpKernel<float>);
ops::ConvMKLDNNOpKernel<float, float>);
REGISTER_OP_KERNEL_WITH_CUSTOM_TYPE(conv3d_grad, MKLDNN,
::paddle::platform::CPUPlace, FP32,
......
......@@ -98,10 +98,12 @@ framework::OpKernelType ConvOp::GetExpectedKernelType(
#endif
auto input_data_type = ctx.Input<Tensor>("Input")->type();
auto filter_data_type = ctx.Input<Tensor>("Filter")->type();
PADDLE_ENFORCE_EQ(input_data_type, filter_data_type,
"input and filter data type should be consistent");
if (input_data_type != framework::proto::VarType::INT8 &&
input_data_type != framework::proto::VarType::UINT8) {
auto filter_data_type = ctx.Input<Tensor>("Filter")->type();
PADDLE_ENFORCE_EQ(input_data_type, filter_data_type,
"input and filter data type should be consistent");
}
if (input_data_type == framework::proto::VarType::FP16) {
PADDLE_ENFORCE_EQ(library, framework::LibraryType::kCUDNN,
"float16 can only be used when CUDNN is used");
......@@ -179,6 +181,26 @@ void Conv2DOpMaker::Make() {
"whenever convolution output is as an input to residual "
"connection.")
.SetDefault(false);
AddAttr<float>("Scale_in",
"Scale_in to be used for int8 input data."
"Only used with MKL-DNN INT8.")
.SetDefault(1.0f);
AddAttr<float>("Scale_out",
"Scale_out to be used for int8 output data."
"Only used with MKL-DNN INT8.")
.SetDefault(1.0f);
AddAttr<float>("Scale_in_eltwise",
"Scale_in_eltwise to be used for int8 eltwise input data."
"Only used with MKL-DNN INT8.")
.SetDefault(1.0f);
AddAttr<std::vector<float>>("Scale_weights",
"Scale_weights to be used for int8 weights data."
"Only used with MKL-DNN INT8.")
.SetDefault({1.0f});
AddAttr<bool>("force_fp32_output",
"(bool, default false) Force INT8 kernel output FP32, only "
"used in MKL-DNN INT8")
.SetDefault(false);
AddAttr<std::string>(
"data_format",
"(string, default NCHW) Only used in "
......@@ -303,6 +325,9 @@ void Conv3DOpMaker::Make() {
"Defaults to \"NHWC\". Specify the data format of the output data, "
"the input will be transformed automatically. ")
.SetDefault("AnyLayout");
AddAttr<bool>("force_fp32_output",
"(bool, default false) Only used in mkldnn INT8 kernel")
.SetDefault(false);
// TODO(dzhwinter): need to registered layout transform function
AddAttr<int>("workspace_size_MB",
"Only used in cudnn kernel. workspace size for cudnn, in MB, "
......
......@@ -29,6 +29,7 @@ namespace operators {
using Tensor = framework::Tensor;
constexpr int kConvMKLDNNFP32 = 1;
constexpr int kConvMKLDNNINT8 = 2;
constexpr int MaxKeyLength = 256;
// Base convolution operator definations for other conv
// like operators to reuse the implementation.
......
......@@ -32,7 +32,7 @@ namespace paddle {
namespace operators {
namespace distributed {
using Tensor = framework::Tensor;
using LoDTensor = framework::LoDTensor;
using LoDTensor = framework::LoDTensor;
using SelectedRows = framework::SelectedRows;
using DDim = framework::DDim;
......@@ -117,6 +117,12 @@ static void MergeMultipleVarsIntoOneBySection(
auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
auto* out_tensor =
scope->FindVar(out_name)->GetMutable<framework::LoDTensor>();
PADDLE_ENFORCE_GT(
out_tensor->numel(), 0,
"When calling this method, the LoDTensor's numel must larger than zero. "
"Please check LoDTensor::Resize has been called first.");
auto* out_tensor_data = out_tensor->mutable_data<float>(id_tensor.place());
bool is_on_cpu_place = true;
......@@ -138,7 +144,7 @@ static void MergeMultipleVarsIntoOneBySection(
auto row_numel = dims[1];
for (size_t i = 0; i < dims[0]; ++i) {
for (int64_t i = 0; i < dims[0]; ++i) {
auto id = ids_in_this_section[i];
auto origin_id = id + abs_sections[section_idx];
auto& offsets = id_to_offset[origin_id];
......@@ -172,8 +178,9 @@ void prefetch(const std::string& id_name, const std::string& out_name,
const std::vector<std::string>& table_names,
const std::vector<std::string>& epmap,
const std::vector<int>& height_sections,
const framework::ExecutionContext& context) {
auto& local_scope = context.scope().NewScope();
const framework::ExecutionContext& context,
const framework::Scope& scope) {
auto& local_scope = scope.NewScope();
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& cpu_ctx = *pool.Get(platform::CPUPlace());
......@@ -190,11 +197,11 @@ void prefetch(const std::string& id_name, const std::string& out_name,
out_var_names.push_back(out_name + "@" + epmap[i]);
}
auto& id_tensor = local_scope.FindVar(id_name)->Get<framework::LoDTensor>();
auto& id_tensor = scope.FindVar(id_name)->Get<framework::LoDTensor>();
std::vector<int64_t> ids_vector;
if (platform::is_cpu_place(id_tensor.place())) {
auto* id_data = id_tensor.data<int64_t>();
for (size_t i = 0; i < id_tensor.numel(); ++i) {
for (int64_t i = 0; i < id_tensor.numel(); ++i) {
ids_vector.push_back(id_data[i]);
}
} else {
......@@ -202,7 +209,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
PADDLE_THROW("paddle is not compiled with CUDA!");
#else
auto cpu_place = platform::CPUPlace();
framework::Tensor cpu_tensor;
framework::LoDTensor cpu_tensor;
auto* cpu_tensor_data =
cpu_tensor.mutable_data<int64_t>(id_tensor.dims(), cpu_place);
auto stream =
......@@ -246,8 +253,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name,
out_var_names, height_sections, splited_ids,
context, &local_scope, &actual_ctx);
context.scope().DeleteScope(&local_scope);
scope.DeleteScope(&local_scope);
}
}; // namespace distributed
......
......@@ -27,7 +27,56 @@ void prefetch(const std::string& id_name, const std::string& out_name,
const std::vector<std::string>& table_names,
const std::vector<std::string>& epmap,
const std::vector<int>& height_sections,
const framework::ExecutionContext& context);
const framework::ExecutionContext& context,
const framework::Scope& scope);
template <typename T>
void prefetch_with_reconstruct(const std::string& id_name,
const std::string& out_name,
const std::vector<std::string>& table_names,
const std::vector<std::string>& epmap,
const std::vector<int>& height_sections,
const framework::ExecutionContext& context,
const framework::Scope& scope,
framework::LoDTensor* original) {
prefetch(id_name, out_name, table_names, epmap, height_sections, context,
scope);
auto& out = scope.FindVar(out_name)->Get<framework::LoDTensor>();
auto& ids = scope.FindVar(id_name)->Get<framework::LoDTensor>();
auto* original_value = original->data<T>();
auto* out_value = out.data<T>();
size_t original_width = original->numel() / original->dims()[0];
bool is_on_cpu_place = true;
if (!platform::is_cpu_place(ids.place())) {
is_on_cpu_place = false;
}
if (is_on_cpu_place) {
for (int64_t i = 0; i < ids.numel(); i++) {
const T* out_rows = out_value + original_width * i;
T* original_row =
original_value + original_width * ids.data<int64_t>()[i];
std::memcpy(original_row, out_rows, original_width * sizeof(T));
}
} else {
#ifndef PADDLE_WITH_CUDA
PADDLE_THROW("paddle is not compiled with CUDA!");
#else
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& actual_ctx = *pool.Get(context.GetPlace());
for (int64_t i = 0; i < ids.numel(); i++) {
const T* out_rows = out_value + original_width * i;
T* original_row =
original_value + original_width * ids.data<int64_t>()[i];
auto stream =
static_cast<platform::CUDADeviceContext*>(&actual_ctx)->stream();
memory::Copy(boost::get<platform::CUDAPlace>(ids.place()), original_row,
platform::CPUPlace(), out_rows, original_width * sizeof(T),
stream);
}
#endif
}
}
}; // namespace distributed
}; // namespace operators
......
......@@ -2,7 +2,9 @@ include(operators)
register_operators(EXCLUDES fusion_transpose_flatten_concat_op fusion_conv_inception_op)
if (WITH_GPU)
op_library(fusion_transpose_flatten_concat_op)
op_library(fusion_conv_inception_op)
file(APPEND ${pybind_file} "USE_CUDA_ONLY_OP(fusion_transpose_flatten_concat);\n")
file(APPEND ${pybind_file} "USE_CUDA_ONLY_OP(conv2d_inception_fusion);\n")
if (NOT ${CUDNN_VERSION} VERSION_LESS 7100)
op_library(fusion_conv_inception_op)
file(APPEND ${pybind_file} "USE_CUDA_ONLY_OP(conv2d_inception_fusion);\n")
endif()
endif()
......@@ -21,7 +21,7 @@ DECLARE_uint64(conv_workspace_size_limit);
namespace paddle {
namespace operators {
#if CUDNN_VERSION >= 7001
#if CUDNN_VERSION >= 7100
using Tensor = framework::Tensor;
using ScopedTensorDescriptor = platform::ScopedTensorDescriptor;
using ScopedFilterDescriptor = platform::ScopedFilterDescriptor;
......@@ -264,7 +264,7 @@ class CUDNNConvInceptionFusionOpKernel : public framework::OpKernel<T> {
} // namespace operators
} // namespace paddle
#if CUDNN_VERSION >= 7001
#if CUDNN_VERSION >= 7100
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(conv2d_inception_fusion,
ops::CUDNNConvInceptionFusionOpKernel<float>,
......
......@@ -67,6 +67,11 @@ class HierarchicalSigmoidOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null.");
PADDLE_ENFORCE(ctx->HasOutput("PreOut"),
"Output(PreOut) should not be null.");
auto with_prefetch = ctx->Attrs().Get<bool>("remote_prefetch");
if (with_prefetch) {
PADDLE_ENFORCE(ctx->HasOutput("W_Out"),
"Output(W_Out) should not be null.");
}
const int64_t batch_size = ctx->GetInputDim("X")[0];
std::vector<int64_t> output_shape({batch_size, 1});
ctx->SetOutputDim("Out", framework::make_ddim(output_shape));
......@@ -95,7 +100,7 @@ class HierarchicalSigmoidOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("Label",
"(LoDTensor, required), The labels of training data. It's a"
"tensor with shape [N, 1].");
AddInput("PTable",
AddInput("PathTable",
"(LoDTensor, optional), The Path Table from root to current word"
"it should have shape like [N, L], L is the length of the Path")
.AsDispensable();
......@@ -119,8 +124,30 @@ class HierarchicalSigmoidOpMaker : public framework::OpProtoAndCheckerMaker {
"[batch_size, code_length], where code_length represents the "
"maximum path length from root to leaf nodes.")
.AsIntermediate();
AddOutput(
"W_Out",
"(LoDTensor, optinal) using input 'W' as Output to make it mutable"
"When we are using prefetch")
.AsIntermediate();
AddAttr<AttrType>("num_classes", "(int, optional), The number of classes")
.SetDefault(2);
// for parameter prefetch
AddAttr<bool>("remote_prefetch", "").SetDefault(false);
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<std::string>>(
"epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"table_names",
"(string vector, the splited table names that will be fetched from "
"parameter server)"
"in the order of input variables for mapping")
.SetDefault({});
AddComment(R"DOC(
The hierarchical sigmoid operator organize the classes into a binary tree.
At each node, a sigmoid function is used to calculate the probability of
......@@ -189,23 +216,17 @@ class HierarchicalSigmoidGradOpGradVarTypeInference
<< " is set to SelectedRows";
block->Var(w_grad_var_name)
->SetType(framework::proto::VarType::SELECTED_ROWS);
if (hasBias) {
VLOG(30) << "hierarchical_sigmoid_grad op "
<< framework::GradVarName("Bias") << " is set to SelectedRows";
block->Var(bias_grad_var_name)
->SetType(framework::proto::VarType::SELECTED_ROWS);
}
} else {
VLOG(30) << "hierarchical_sigmoid_grad op " << framework::GradVarName("W")
<< " is set to LoDTensor";
block->Var(w_grad_var_name)
->SetType(framework::proto::VarType::LOD_TENSOR);
if (hasBias) {
VLOG(30) << "hierarchical_sigmoid_grad op "
<< framework::GradVarName("Bias") << " is set to LoDTensor";
block->Var(bias_grad_var_name)
->SetType(framework::proto::VarType::LOD_TENSOR);
}
}
if (hasBias) {
VLOG(30) << "hierarchical_sigmoid_grad op "
<< framework::GradVarName("Bias") << " is set to LoDTensor";
block->Var(bias_grad_var_name)
->SetType(framework::proto::VarType::LOD_TENSOR);
}
block->Var(w_grad_var_name)->SetDataType(block->Var("W")->GetDataType());
}
......
......@@ -14,7 +14,9 @@ limitations under the License. */
#pragma once
#include <iostream>
#include <iterator>
#include <set>
#include <string>
#include <vector>
#include "paddle/fluid/framework/mixed_vector.h"
#include "paddle/fluid/framework/op_registry.h"
......@@ -24,6 +26,10 @@ limitations under the License. */
#include "paddle/fluid/operators/math/matrix_bit_code.h"
#include "paddle/fluid/platform/transform.h"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
#endif
namespace paddle {
namespace operators {
......@@ -34,8 +40,9 @@ using platform::Transform;
static std::vector<int64_t> PathToRows(const framework::LoDTensor& path) {
std::set<int64_t> rows;
const int64_t* paths = path.data<int64_t>();
for (int64_t i = 0; i < path.numel(); ++i) {
int64_t row = path.data<int64_t>()[i];
int64_t row = paths[i];
if (row < 0) {
continue;
}
......@@ -49,13 +56,54 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel<T> {
void Compute(const framework::ExecutionContext& ctx) const override {
auto& in = detail::Ref(ctx.Input<framework::LoDTensor>("X"));
auto& w = detail::Ref(ctx.Input<framework::LoDTensor>("W"));
auto* path = ctx.Input<framework::LoDTensor>("PTable");
auto* path = ctx.Input<framework::LoDTensor>("PathTable");
auto* code = ctx.Input<framework::LoDTensor>("PathCode");
auto& label = detail::Ref(ctx.Input<framework::LoDTensor>("Label"));
auto* bias = ctx.Input<framework::LoDTensor>("Bias");
auto* out = ctx.Output<framework::LoDTensor>("Out");
auto* pre_out = ctx.Output<framework::LoDTensor>("PreOut");
size_t num_classes = static_cast<size_t>(ctx.Attr<int>("num_classes"));
// for remote prefetch
auto epmap = ctx.Attr<std::vector<std::string>>("epmap");
if (!epmap.empty()) {
// if epmap is not empty, then the parameter will be fetched from remote
// parameter
// server
auto height_sections = ctx.Attr<std::vector<int>>("height_sections");
auto table_names = ctx.Attr<std::vector<std::string>>("table_names");
std::vector<int64_t> real_rows = PathToRows(*path);
framework::Scope& local_scope = ctx.scope().NewScope();
auto* ids = local_scope.Var("Ids@Prefetch");
auto* x_tensor = ids->GetMutable<framework::LoDTensor>();
x_tensor->mutable_data<int64_t>(
framework::make_ddim({static_cast<int64_t>(real_rows.size()), 1}),
ctx.GetPlace());
// copy.
std::memcpy(x_tensor->data<int64_t>(), real_rows.data(),
real_rows.size() * sizeof(int64_t));
framework::DDim w_dims = ctx.Input<Tensor>("W")->dims();
w_dims[0] = x_tensor->dims()[0];
auto* w_tensor =
local_scope.Var("W@Prefetch")->GetMutable<framework::LoDTensor>();
w_tensor->Resize(w_dims);
#ifdef PADDLE_WITH_DISTRIBUTE
// w_Out is set to used by prefetch, never change it in other cases
auto* w_out = ctx.Output<framework::LoDTensor>("W_Out");
operators::distributed::prefetch_with_reconstruct<T>(
"Ids@Prefetch", "W@Prefetch", table_names, epmap, height_sections,
ctx, local_scope, w_out);
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
"parameter prefetch!");
#endif
}
bool is_custom = false;
if (path) {
is_custom = true;
......@@ -116,9 +164,8 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel<T> {
void Compute(const framework::ExecutionContext& ctx) const override {
auto& in = detail::Ref(ctx.Input<framework::LoDTensor>("X"));
auto& w = detail::Ref(ctx.Input<framework::LoDTensor>("W"));
auto* path = ctx.Input<framework::LoDTensor>("PTable");
auto* path = ctx.Input<framework::LoDTensor>("PathTable");
auto* code = ctx.Input<framework::LoDTensor>("PathCode");
auto* bias = ctx.Input<framework::LoDTensor>("Bias");
auto* in_grad =
ctx.Output<framework::LoDTensor>(framework::GradVarName("X"));
bool is_sparse = ctx.Attr<bool>("is_sparse");
......@@ -173,15 +220,14 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel<T> {
}
// TODO(guosheng): multiply pre_out_grad with subgradient of clipping to
// be consistent with the clipping in forward.
auto* bias_grad =
ctx.Output<framework::LoDTensor>(framework::GradVarName("Bias"));
if (bias_grad) {
bias_grad->mutable_data<T>(ctx.GetPlace());
zero(dev_ctx, bias_grad, static_cast<T>(0.0));
bit_code->AddGrad(pre_out_grad, bias_grad);
}
if (!is_sparse) {
auto* bias_grad =
ctx.Output<framework::LoDTensor>(framework::GradVarName("Bias"));
if (bias_grad) {
bias_grad->mutable_data<T>(ctx.GetPlace());
zero(dev_ctx, bias_grad, static_cast<T>(0.0));
bit_code->AddGrad(pre_out_grad, bias_grad);
}
auto* w_grad =
ctx.Output<framework::LoDTensor>(framework::GradVarName("W"));
w_grad->mutable_data<T>(ctx.GetPlace());
......@@ -200,21 +246,6 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel<T> {
w_grad_value->mutable_data<T>(temp_dim, ctx.GetPlace());
zero(dev_ctx, w_grad_value, static_cast<T>(0.0));
auto* bias_grad =
ctx.Output<framework::SelectedRows>(framework::GradVarName("Bias"));
if (bias_grad) {
bias_grad->set_rows(real_rows);
// build ids -> rows index map
bias_grad->SyncIndex();
bias_grad->set_height(bias->dims()[0]);
auto* bias_grad_value = bias_grad->mutable_value();
std::vector<int64_t> dims = {static_cast<int64_t>(real_rows.size()),
bias->dims()[1]};
bias_grad_value->mutable_data<T>(framework::make_ddim(dims),
ctx.GetPlace());
zero(dev_ctx, bias_grad_value, static_cast<T>(0.0));
bit_code->AddGrad(pre_out_grad, bias_grad);
}
bit_code->MulGradWeight(pre_out_grad, w_grad, in);
}
bit_code->MulGradError(pre_out_grad, w, in_grad);
......
......@@ -92,7 +92,8 @@ class LookupTableCUDAKernel : public framework::OpKernel<T> {
// server
#ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch(id_name, out_name, table_names, epmap,
height_sections, context);
height_sections, context,
context.scope());
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
......
......@@ -59,7 +59,8 @@ class LookupTableKernel : public framework::OpKernel<T> {
// server
#ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch(id_name, out_name, table_names, epmap,
height_sections, context);
height_sections, context,
context.scope());
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
......
......@@ -84,41 +84,6 @@ void MatrixBitCodeFunctor<T>::AddGrad(const framework::Tensor &tmat,
code_table_.apply_visitor(func);
}
template <typename T>
struct MatrixBitCodeFunctorSelectedRowsAddGrad
: public boost::static_visitor<void> {
const framework::Tensor &tmat_;
framework::SelectedRows *vec_;
MatrixBitCodeFunctorSelectedRowsAddGrad(const framework::Tensor &tmat,
framework::SelectedRows *vec)
: tmat_(tmat), vec_(vec) {}
template <typename CodeTable>
void operator()(const CodeTable &code_table) {
size_t batch_size = tmat_.dims()[0];
size_t width = tmat_.dims()[1];
auto *vec_data = vec_->mutable_value()->template data<T>();
auto *tmat_data = tmat_.data<T>();
for (size_t i = 0; i < batch_size; ++i) {
auto code = code_table.get_code(i);
int code_length = code.get_length();
for (int j = 0; j < code_length; ++j) {
size_t index = code.calc_index(j);
int64_t row_index = vec_->GetIndexFromId(static_cast<int64_t>(index));
vec_data[row_index] += tmat_data[i * width + j];
}
}
}
};
template <typename T>
void MatrixBitCodeFunctor<T>::AddGrad(const framework::Tensor &tmat,
framework::SelectedRows *vec) {
MatrixBitCodeFunctorSelectedRowsAddGrad<T> func(tmat, vec);
code_table_.apply_visitor(func);
}
template <typename T>
struct MatrixBitCodeFunctorSum : public boost::static_visitor<void> {
const framework::Tensor &tmat_;
......
......@@ -124,11 +124,12 @@ class SimpleCode {
template <typename T>
class CustomCode {
public:
CustomCode(const framework::Tensor& ptable, const framework::Tensor& pcode,
const int64_t* ids, int index) {
seq_len_ = ptable.dims()[1];
ptable_data_ = ptable.data<T>() + seq_len_ * index;
pcode_data_ = pcode.data<T>() + seq_len_ * index;
CustomCode(const framework::Tensor& path_table,
const framework::Tensor& path_code, const int64_t* ids,
int index) {
seq_len_ = path_table.dims()[1];
path_table_data_ = path_table.data<T>() + seq_len_ * index;
path_code_data_ = path_code.data<T>() + seq_len_ * index;
}
/**
* Here the id of root should be 1 rather than 0, thus the encoding of class c
......@@ -139,25 +140,25 @@ class CustomCode {
* Binary classification path is the suffixes of encoding, thus leave out the
* left most bit in calc_bit.
*/
size_t calc_index(int bit) const { return ptable_data_[bit]; }
bool calc_bit(int bit) const { return pcode_data_[bit]; }
size_t calc_index(int bit) const { return path_table_data_[bit]; }
bool calc_bit(int bit) const { return path_code_data_[bit]; }
// NOTE: this function is not thread-safe.
int get_length() const {
if (length_ < 0) {
auto len = seq_len_;
length_ =
static_cast<int>(std::find_if(ptable_data_, ptable_data_ + len,
[](const T& val) { return val < 0; }) -
ptable_data_);
length_ = static_cast<int>(
std::find_if(path_table_data_, path_table_data_ + len,
[](const T& val) { return val < 0; }) -
path_table_data_);
}
return length_;
}
private:
int64_t seq_len_;
const T* ptable_data_;
const T* pcode_data_;
const T* path_table_data_;
const T* path_code_data_;
mutable int length_{-1};
};
......@@ -181,9 +182,9 @@ class SimpleCodeTable {
template <typename T>
class CustomCodeTable {
public:
CustomCodeTable(const framework::Tensor& ptable,
const framework::Tensor& pcode, const int64_t* ids)
: ptable_(ptable), pcode_(pcode), ids_(ids) {}
CustomCodeTable(const framework::Tensor& path_table,
const framework::Tensor& path_code, const int64_t* ids)
: ptable_(path_table), pcode_(path_code), ids_(ids) {}
CustomCode<T> get_code(int64_t code) const {
return CustomCode<T>(ptable_, pcode_, ids_, code);
......@@ -210,11 +211,11 @@ class MatrixBitCodeFunctor {
ids_(ids),
code_table_(SimpleCodeTable(num_classes, ids)) {}
MatrixBitCodeFunctor(const framework::Tensor& ptable,
const framework::Tensor& pcode, const int64_t* ids)
: num_classes_(static_cast<size_t>(ptable.dims()[1])),
MatrixBitCodeFunctor(const framework::Tensor& path_table,
const framework::Tensor& path_code, const int64_t* ids)
: num_classes_(static_cast<size_t>(path_table.dims()[1])),
ids_(ids),
code_table_(CustomCodeTable<int64_t>(ptable, pcode, ids)) {}
code_table_(CustomCodeTable<int64_t>(path_table, path_code, ids)) {}
/* For j < code_length
tmat(i, j) += vec(0, index(i, j))
*/
......@@ -225,11 +226,6 @@ class MatrixBitCodeFunctor {
*/
void AddGrad(const framework::Tensor& tmat, framework::Tensor* vec);
/* For selected rows For j < code_length
vec(0, index(i, j)) += tmat(i, j)
*/
void AddGrad(const framework::Tensor& tmat, framework::SelectedRows* vec);
/* For j < code_length
sum(i, 0) = \sum_j bit(i, j) * tmat(i, j)
*/
......
......@@ -153,6 +153,24 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("is_sparse", "(boolean, default false) Sparse update.")
.SetDefault(false);
// for parameter prefetch
AddAttr<bool>("remote_prefetch", "").SetDefault(false);
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<std::string>>(
"epmap",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"table_names",
"(string vector, the splited table names that will be fetched from "
"parameter server)"
"in the order of input variables for mapping")
.SetDefault({});
AddAttr<std::vector<int>>("custom_neg_classes",
"This attribute only be used in unitest. Classes "
"in this list wiil be used as negative classes "
......@@ -222,24 +240,20 @@ class NCEOpGradVarTypeInference : public framework::VarTypeInference {
void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override {
auto weight_grad = op_desc.Output(framework::GradVarName("Weight")).front();
auto bias_grad = op_desc.Output(framework::GradVarName("Bias")).front();
auto attr = op_desc.GetAttr("is_sparse");
bool is_sparse = boost::get<bool>(attr);
if (is_sparse) {
VLOG(3) << "nce_op_grad op " << weight_grad << " and " << bias_grad
VLOG(3) << "nce_op_grad op " << weight_grad << " and "
<< " is set to SelectedRows";
block->Var(weight_grad)
->SetType(framework::proto::VarType::SELECTED_ROWS);
block->Var(bias_grad)->SetType(framework::proto::VarType::SELECTED_ROWS);
} else {
VLOG(3) << "nce_op_grad op " << weight_grad << " and " << bias_grad
VLOG(3) << "nce_op_grad op " << weight_grad << " and "
<< " is set to LoDTensor";
block->Var(weight_grad)->SetType(framework::proto::VarType::LOD_TENSOR);
block->Var(bias_grad)->SetType(framework::proto::VarType::LOD_TENSOR);
}
block->Var(weight_grad)->SetDataType(block->Var("Input")->GetDataType());
block->Var(bias_grad)->SetDataType(block->Var("Input")->GetDataType());
}
};
......
......@@ -15,8 +15,10 @@ limitations under the License. */
#pragma once
#include <math.h>
#include <iterator>
#include <random>
#include <set>
#include <string>
#include <vector>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
......@@ -24,6 +26,10 @@ limitations under the License. */
#include "paddle/fluid/operators/math/sampler.h"
#include "unsupported/Eigen/CXX11/Tensor"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
#endif
namespace paddle {
namespace operators {
......@@ -43,7 +49,6 @@ void PrepareSamples(const framework::ExecutionContext &context,
auto label = context.Input<Tensor>("Label");
const int64_t *label_data = label->data<int64_t>();
auto label_dims = label->dims();
// int num_total_classes = context.Attr<int>("num_total_classes");
// for unitest
std::vector<int> custom_neg_classes =
context.Attr<std::vector<int>>("custom_neg_classes");
......@@ -144,15 +149,82 @@ class NCEKernel : public framework::OpKernel<T> {
}
// forward mul
auto input_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Input")));
auto weight_mat = EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(sample_labels_data[i], 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
// for remote prefetch
auto epmap = context.Attr<std::vector<std::string>>("epmap");
if (!epmap.empty()) {
// if epmap is not empty, then the parameter will be fetched from remote
// parameter
// server
std::vector<int64_t> labels;
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
labels.push_back(sample_labels_data[i]);
}
std::set<T> st(labels.begin(), labels.end());
labels.assign(st.begin(), st.end());
framework::Scope &local_scope = context.scope().NewScope();
auto height_sections = context.Attr<std::vector<int>>("height_sections");
auto table_names = context.Attr<std::vector<std::string>>("table_names");
auto *ids = local_scope.Var("Ids@Prefetch");
auto *x_tensor = ids->GetMutable<framework::LoDTensor>();
x_tensor->mutable_data<int64_t>(
framework::make_ddim({static_cast<int64_t>(labels.size()), 1}),
context.GetPlace());
// copy.
std::memcpy(x_tensor->data<int64_t>(), labels.data(),
labels.size() * sizeof(int64_t));
std::vector<int> w_dims = paddle::framework::vectorize2int(
context.Input<Tensor>("Weight")->dims());
w_dims[0] = static_cast<int>(labels.size());
auto *w_tensor = local_scope.Var("Weight@Prefetch")
->GetMutable<framework::LoDTensor>();
w_tensor->Resize(framework::make_ddim(w_dims));
#ifdef PADDLE_WITH_DISTRIBUTE
operators::distributed::prefetch("Ids@Prefetch", "Weight@Prefetch",
table_names, epmap, height_sections,
context, local_scope);
#else
PADDLE_THROW(
"paddle is not compiled with distribute support, can not do "
"parameter prefetch!");
#endif
auto weight_mat = EigenMatrix<T>::From(
(local_scope.Var("Weight@Prefetch")->Get<framework::LoDTensor>()));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
std::vector<int64_t>::iterator it =
std::find(labels.begin(), labels.end(), sample_labels_data[i]);
int idx = std::distance(labels.begin(), it);
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(idx, 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
context.scope().DeleteScope(&local_scope);
} else {
auto weight_mat =
EigenMatrix<T>::From(*(context.Input<Tensor>("Weight")));
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
Eigen::Tensor<T, 0, Eigen::RowMajor, Eigen::DenseIndex> result =
(input_mat.chip(static_cast<int>(i / sample_labels->dims()[1]), 0) *
weight_mat.chip(sample_labels_data[i], 0))
.sum();
sample_out_data[i] += result(0);
sample_out_data[i] = (1. / (1. + exp(-sample_out_data[i])));
}
}
// forward cost
for (int64_t i = 0; i < sample_labels->dims()[0]; ++i) {
out_data[i] = 0;
......@@ -240,18 +312,19 @@ class NCEGradKernel : public framework::OpKernel<T> {
sample_grad_data[i] *= d_out_data[sample_idx];
}
// get d_bias
auto d_bias = context.Output<Tensor>(framework::GradVarName("Bias"));
if (d_bias != nullptr) {
T *d_bias_data = d_bias->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + d_bias->numel(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[sample_labels_data[i]] += sample_grad_data[i];
}
}
bool is_sparse = context.Attr<bool>("is_sparse");
if (!is_sparse) {
// get d_bias
auto d_bias = context.Output<Tensor>(framework::GradVarName("Bias"));
if (d_bias != nullptr) {
T *d_bias_data = d_bias->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + d_bias->numel(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[sample_labels_data[i]] += sample_grad_data[i];
}
}
// get d_w
auto d_w = context.Output<Tensor>(framework::GradVarName("Weight"));
if (d_w != nullptr) {
......@@ -273,34 +346,6 @@ class NCEGradKernel : public framework::OpKernel<T> {
std::set<T> st(labels.begin(), labels.end());
labels.assign(st.begin(), st.end());
auto *bias_var = context.InputVar("Bias");
DDim bias_dim;
if (bias_var->IsType<LoDTensor>()) {
bias_dim = context.Input<LoDTensor>("Bias")->dims();
} else if (bias_var->IsType<SelectedRows>()) {
auto *table_t = context.Input<SelectedRows>("Bias");
bias_dim = table_t->value().dims();
} else {
PADDLE_THROW(
"The parameter Bias of a NCE_OP "
"must be either LoDTensor or SelectedRows");
}
auto d_bias =
context.Output<SelectedRows>(framework::GradVarName("Bias"));
d_bias->set_rows(labels);
d_bias->set_height(bias_dim[0]);
d_bias->mutable_value()->Resize(
{static_cast<int64_t>(labels.size()), bias_dim[1]});
T *d_bias_data =
d_bias->mutable_value()->mutable_data<T>(context.GetPlace());
std::fill(d_bias_data, d_bias_data + labels.size(), 0.0);
for (int64_t i = 0; i < sample_labels->numel(); ++i) {
d_bias_data[d_bias->Index(sample_labels_data[i])] +=
sample_grad_data[i];
}
auto *table_var = context.InputVar("Weight");
DDim table_dim;
if (table_var->IsType<LoDTensor>()) {
......
......@@ -49,7 +49,7 @@ void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
class CTRReader : public framework::FileReader {
public:
explicit CTRReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue,
int batch_size, int thread_num,
int batch_size, size_t thread_num,
const std::vector<std::string>& slots,
const std::vector<std::string>& file_list)
: batch_size_(batch_size), slots_(slots), file_list_(file_list) {
......
......@@ -145,7 +145,8 @@ class MKLDNNHandler {
const std::shared_ptr<mkldnn::memory> user_memory_p,
const std::string& suffix,
std::vector<mkldnn::primitive>& pipeline, // NOLINT
bool is_persistent = false) {
bool is_persistent = false, bool is_INT8 = false,
std::vector<float> scale_data = {1.0f}, int mask = 0) {
// create reorder primitive if the input format is not the preferred one
auto local_key = key_ + suffix;
auto key_reorder_p = key_ + suffix + "reorder_p";
......@@ -159,8 +160,20 @@ class MKLDNNHandler {
std::shared_ptr<mkldnn::primitive> reorder_p;
if (mpd != user_mpd) {
target_memory_p = std::make_shared<mkldnn::memory>(mpd);
auto reorder_p =
std::make_shared<mkldnn::reorder>(*user_memory_p, *target_memory_p);
std::shared_ptr<mkldnn::reorder> reorder_p;
if (is_INT8) {
mkldnn::primitive_attr
attri; // attribute for int8 weights and bias data reorder.
attri.set_output_scales(mask, scale_data);
auto reorder_pd = std::shared_ptr<mkldnn::reorder::primitive_desc>(
new mkldnn::reorder::primitive_desc(user_mpd, mpd, attri));
reorder_p = std::shared_ptr<mkldnn::reorder>(new mkldnn::reorder(
*reorder_pd, *user_memory_p, *target_memory_p));
} else {
reorder_p = std::make_shared<mkldnn::reorder>(*user_memory_p,
*target_memory_p);
}
dev_ctx_.SetBlob(key_reorder_p, reorder_p);
pipeline.push_back(*reorder_p);
}
......@@ -182,22 +195,56 @@ class MKLDNNHandler {
return dims2str(operand_dims) + suffix;
}
template <typename M>
template <typename T>
static void SetDstMemory(
const framework::ExecutionContext& ctx, framework::Tensor* output,
std::vector<int> dst_tz, const mkldnn::engine& engine,
std::shared_ptr<mkldnn::memory::primitive_desc>& dst_pd, // NOLINT
std::shared_ptr<mkldnn::memory>& dst_memory) { // NOLINT
M* output_data = output->mutable_data<M>(ctx.GetPlace());
T* output_data = output->mutable_data<T>(ctx.GetPlace());
auto dst_md = platform::MKLDNNMemDesc(
{dst_tz}, paddle::framework::ToMKLDNNDataType(
framework::DataTypeTrait<M>::DataType),
framework::DataTypeTrait<T>::DataType),
mkldnn::memory::format::nhwc);
dst_pd.reset(new mkldnn::memory::primitive_desc(dst_md, engine));
dst_memory.reset(new mkldnn::memory(*dst_pd, to_void_cast<M>(output_data)));
dst_memory.reset(new mkldnn::memory(*dst_pd, to_void_cast<T>(output_data)));
}
static void AppendKey(
std::string* key, const mkldnn::memory::dims& input_dims,
const mkldnn::memory::dims& weights_dims, const std::vector<int>& strides,
const std::vector<int>& paddings, const std::vector<int>& dilations,
const int& groups, const mkldnn::memory::data_type& type,
const mkldnn::memory::format& format, const std::string& suffix) {
AppendKeyDims(key, input_dims);
AppendKeyDims(key, weights_dims);
AppendKeyVec(key, strides);
AppendKeyVec(key, paddings);
AppendKeyVec(key, dilations);
AppendKey(key, std::to_string(groups));
AppendKey(key, std::to_string(type));
AppendKey(key, std::to_string(format));
AppendKey(key, suffix);
}
protected:
static void AppendKeyDims(std::string* key,
const mkldnn::memory::dims& dims) {
for (unsigned int i = 0; i < dims.size(); i++) {
AppendKey(key, std::to_string(dims[i]));
}
}
static void AppendKeyVec(std::string* key, const std::vector<int>& dims) {
for (unsigned int i = 0; i < dims.size(); i++) {
AppendKey(key, std::to_string(dims[i]));
}
}
static void AppendKey(std::string* key, const std::string& s) {
key->append(s);
}
static std::string dims2str(const mkldnn::memory::dims& operand_dims) {
std::string dstr = "";
for (size_t i = 0; i < operand_dims.size(); ++i) {
......@@ -215,7 +262,8 @@ class MKLDNNHandler {
class TransposeMKLDNNHandler : public MKLDNNHandler {
public:
TransposeMKLDNNHandler(std::vector<int>& dims, std::vector<int>& axis,
TransposeMKLDNNHandler(std::vector<int>& dims, // NOLINT
std::vector<int>& axis, // NOLINT
const platform::MKLDNNDeviceContext& dev_ctx,
mkldnn::engine engine, const std::string& base_key)
: platform::MKLDNNHandler(dev_ctx, engine, base_key),
......@@ -303,8 +351,9 @@ class TransposeMKLDNNHandler : public MKLDNNHandler {
}
protected:
mkldnn_memory_desc_t Axis2MemoryDesc(std::vector<int>& nchw_tz,
std::vector<int>& axis) {
mkldnn_memory_desc_t Axis2MemoryDesc(std::vector<int>& nchw_tz, // NOLINT
std::vector<int>& axis // NOLINT
) {
mkldnn_memory_desc_t mem_fmt;
mem_fmt.primitive_kind = mkldnn_memory;
......@@ -462,21 +511,26 @@ class ConvMKLDNNTemplateHandler : public MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireWeightsMemoryFromPrimitive(
const std::shared_ptr<mkldnn::memory> user_weights_memory_p,
std::vector<mkldnn::primitive>& pipeline, // NOLINT
bool is_persistent = false) {
bool is_persistent = false, bool is_INT8 = false,
std::vector<float> scale_data = {1.0f}, int mask = 0) {
auto user_weights_pd = user_weights_memory_p->get_primitive_desc();
auto weights_pd = conv_pd_->weights_primitive_desc();
return this->AcquireMemory(weights_pd, user_weights_pd,
user_weights_memory_p, "@weights_mem_p",
pipeline, is_persistent);
return this->AcquireMemory(
weights_pd, user_weights_pd, user_weights_memory_p, "@weights_mem_p",
pipeline, is_persistent, is_INT8, scale_data, mask);
}
std::shared_ptr<mkldnn::memory> AcquireBiasMemoryFromPrimitive(
const std::shared_ptr<mkldnn::memory> user_bias_memory_p,
std::vector<mkldnn::primitive>& pipeline) { // NOLINT
std::vector<mkldnn::primitive>& pipeline, // NOLINT
bool is_persistent = false, bool is_INT8 = false,
std::vector<float> scale_data = {1.0f},
int mask = 0) { // NOLINT
auto user_bias_pd = user_bias_memory_p->get_primitive_desc();
auto bias_pd = conv_pd_->bias_primitive_desc();
return this->AcquireMemory(bias_pd, user_bias_pd, user_bias_memory_p,
"@bias_mem_p", pipeline);
"@bias_mem_p", pipeline, is_persistent, is_INT8,
scale_data, mask);
}
std::shared_ptr<forward_t> AcquireConvolution(
......@@ -594,5 +648,29 @@ using ConvTransposeMKLDNNHandler =
ConvMKLDNNTemplateHandler<mkldnn::deconvolution_forward,
mkldnn::deconvolution_backward_data,
mkldnn::deconvolution_backward_weights>;
template <typename T>
static std::shared_ptr<mkldnn::memory> SetDstMemory(
const framework::ExecutionContext& ctx, framework::Tensor* output,
const std::shared_ptr<ConvMKLDNNHandler>& handler) {
T* output_data = output->mutable_data<T>(
ctx.GetPlace(), ::paddle::memory::Allocator::kDefault,
handler->GetDstMemorySize());
std::shared_ptr<mkldnn::memory> dst_memory_p =
handler->AcquireDstMemoryFromPrimitive(to_void_cast<T>(output_data));
return dst_memory_p;
}
template <typename T>
static std::shared_ptr<mkldnn::memory> SetDstMemoryHandler(
const framework::ExecutionContext& ctx, framework::Tensor* output,
const std::shared_ptr<ConvMKLDNNHandler>& handler) {
T* output_data = output->mutable_data<T>(
ctx.GetPlace(), ::paddle::memory::Allocator::kDefault,
handler->GetDstMemorySize());
std::shared_ptr<mkldnn::memory> dst_memory_p;
dst_memory_p->set_data_handle(to_void_cast<T>(output_data));
return dst_memory_p;
}
} // namespace platform
} // namespace paddle
......@@ -106,7 +106,7 @@ struct NCCLContextMap {
}
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
// if num_trainers == 1, should create a new nccl id for local comms.
if (num_trainers == 1) {
if (num_trainers == 1 && nccl_id == nullptr) {
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
comms.get(), static_cast<int>(order_.size()), order_.data()));
......
......@@ -12,9 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/port.h"
#include <algorithm>
#include <iomanip>
#include <limits>
......@@ -25,9 +22,12 @@ limitations under the License. */
#ifdef PADDLE_WITH_CUDA
#include <cuda.h>
#endif // PADDLE_WITH_CUDA
#include "glog/logging.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/printf.h"
DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not.");
......@@ -173,8 +173,9 @@ void PopEvent(const std::string& name, const DeviceContext* dev_ctx) {
RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
: is_enabled_(false), start_ns_(PosixInNsec()) {
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled) return;
std::lock_guard<std::mutex> l(profiler_mu);
is_enabled_ = true;
dev_ctx_ = dev_ctx;
name_ = name;
......@@ -184,8 +185,8 @@ RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
}
RecordEvent::~RecordEvent() {
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
std::lock_guard<std::mutex> l(profiler_mu);
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(),
......
......@@ -918,11 +918,11 @@ function main() {
cmake_gen ${PYTHON_ABI:-""}
build
assert_api_not_changed ${PYTHON_ABI:-""}
assert_api_spec_approvals
run_test
gen_capi_package
gen_fluid_lib
test_fluid_lib
assert_api_spec_approvals
;;
assert_api)
assert_api_not_changed ${PYTHON_ABI:-""}
......
......@@ -102,13 +102,6 @@ def __bootstrap__():
import sys
import os
import platform
if os.name == 'nt':
third_lib_path = os.path.abspath(os.path.dirname(
__file__)) + os.sep + '..' + os.sep + 'libs'
os.environ['path'] += ';' + third_lib_path
sys.path.append(third_lib_path)
from . import core
in_test = 'unittest' in sys.modules
......@@ -135,7 +128,8 @@ def __bootstrap__():
'free_idle_memory', 'paddle_num_threads', "dist_threadpool_size",
'eager_delete_tensor_gb', 'fast_eager_deletion_mode',
'allocator_strategy', 'reader_queue_speed_test_mode',
'print_sub_graph_dir', 'pe_profile_fname', 'warpctc_dir'
'print_sub_graph_dir', 'pe_profile_fname', 'warpctc_dir',
'enable_parallel_graph'
]
if 'Darwin' not in sysstr:
read_env_flags.append('use_pinned_memory')
......@@ -158,14 +152,10 @@ def __bootstrap__():
if core.is_compiled_with_cuda():
read_env_flags += [
'fraction_of_gpu_memory_to_use',
'cudnn_deterministic',
'enable_cublas_tensor_op_math',
'conv_workspace_size_limit',
'cudnn_exhaustive_search',
'memory_optimize_debug',
'selected_gpus',
'cudnn_exhaustive_search_times',
'fraction_of_gpu_memory_to_use', 'cudnn_deterministic',
'enable_cublas_tensor_op_math', 'conv_workspace_size_limit',
'cudnn_exhaustive_search', 'memory_optimize_debug', 'selected_gpus',
'cudnn_exhaustive_search_times', 'sync_nccl_allreduce'
]
core.init_gflags([sys.argv[0]] +
......
......@@ -26,6 +26,13 @@ import numpy as np
from .. import compat as cpt
from .proto import framework_pb2
try:
if os.name == 'nt':
import sys
third_lib_path = os.path.abspath(os.path.dirname(
__file__)) + os.sep + '..' + os.sep + 'libs'
os.environ['path'] += ';' + third_lib_path
sys.path.append(third_lib_path)
from . import core
except ImportError as e:
if os.name == 'nt':
......
......@@ -26,7 +26,7 @@ from ..initializer import Normal, Constant
from ..framework import Variable, OpProtoHolder
from ..param_attr import ParamAttr
from .layer_function_generator import autodoc, templatedoc, _generate_doc_string_
from .tensor import concat
from .tensor import concat, assign
from . import utils
from .. import unique_name
from functools import reduce
......@@ -340,9 +340,7 @@ def embedding(input,
"""
helper = LayerHelper('embedding', **locals())
remote_prefetch = False
if os.environ.get('PADDLE_ENABLE_REMOTE_PREFETCH'):
remote_prefetch = True
remote_prefetch = is_sparse and (not is_distributed)
if remote_prefetch:
assert is_sparse is True and is_distributed is False
w = helper.create_parameter(
......@@ -5033,12 +5031,18 @@ def nce(input,
else:
num_neg_samples = int(num_neg_samples)
remote_prefetch = is_sparse
print(
"With sparse mode, if your models has only small parameter prefetch may cause speed down"
)
attrs = {
'num_total_classes': int(num_total_classes),
'num_neg_samples': num_neg_samples,
'seed': seed,
'sampler': sampler,
'is_sparse': is_sparse
'is_sparse': is_sparse,
'remote_prefetch': remote_prefetch
}
helper.append_op(
......@@ -5148,7 +5152,10 @@ def hsigmoid(input,
pass
weights = None
remote_prefetch = is_sparse
print(
"With sparse mode, if your models has only small parameter prefetch may cause speed down"
)
if not is_custom:
weights = helper.create_parameter(
attr=helper.param_attr,
......@@ -5164,7 +5171,7 @@ def hsigmoid(input,
inputs = {
"X": input,
"W": weights,
"PTable": path_table,
"PathTable": path_table,
"PathCode": path_code,
"Label": label
}
......@@ -5187,9 +5194,13 @@ def hsigmoid(input,
type="hierarchical_sigmoid",
inputs=inputs,
outputs={"Out": out,
"PreOut": pre_out},
attrs={"num_classes": num_classes,
"is_sparse": is_sparse})
"PreOut": pre_out,
"W_Out": weights},
attrs={
"num_classes": num_classes,
"is_sparse": is_sparse,
"remote_prefetch": remote_prefetch
})
return out
......@@ -7685,7 +7696,7 @@ def brelu(x, t_min=0.0, t_max=24.0, name=None):
Examples:
.. code-block:: python
.. code-block:: python
x = fluid.layers.data(name="x", shape=[2,3,16,16], dtype="float32")
y = fluid.layers.brelu(x, t_min=1.0, t_max=20.0)
......
......@@ -21,6 +21,8 @@ if(NOT WITH_DISTRIBUTE)
LIST(REMOVE_ITEM TEST_OPS test_dist_simnet_bow)
LIST(REMOVE_ITEM TEST_OPS test_dist_mnist_batch_merge)
LIST(REMOVE_ITEM TEST_OPS test_dist_text_classification)
LIST(REMOVE_ITEM TEST_OPS test_nce_remote_table_op)
LIST(REMOVE_ITEM TEST_OPS test_hsigmoid_remote_table_op)
endif(NOT WITH_DISTRIBUTE)
if (NOT ${WITH_GPU})
......@@ -32,7 +34,6 @@ endif()
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
list(REMOVE_ITEM TEST_OPS test_modified_huber_loss_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5184
list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185
list(REMOVE_ITEM TEST_OPS test_nce) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/7778
list(REMOVE_ITEM TEST_OPS test_recurrent_op) # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/6152
list(REMOVE_ITEM TEST_OPS test_cond_op) # FIXME(qijun): https://github.com/PaddlePaddle/Paddle/issues/5101#issuecomment-339814957
......
......@@ -78,7 +78,6 @@ class TestParallelExecutorBase(unittest.TestCase):
exec_strategy.allow_op_delay = allow_op_delay
if use_fast_executor:
exec_strategy.use_experimental_executor = True
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
......
......@@ -51,8 +51,9 @@ class TestConv2dFusionOp(OpTest):
input = np.random.random(self.input_size).astype(self.dtype)
filter = np.random.random(self.filter_size).astype(self.dtype)
self.output = conv2d_forward_naive(input, filter, self.groups,
conv2d_param).astype(self.dtype)
self.output, _, _, _, _ = conv2d_forward_naive(
input, filter, self.groups, conv2d_param)
self.output = self.output.astype(self.dtype)
self.inputs = {
'Input': OpTest.np_dtype_to_fluid_dtype(input),
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle.fluid.core as core
from op_test import OpTest
from test_conv2d_op import conv2d_forward_naive, TestConv2dOp
def conv2d_forward_refer(input, filter, group, conv_param):
out, in_n, out_h, out_w, out_c = conv2d_forward_naive(input, filter, group,
conv_param)
out_tmp = np.zeros((in_n, out_h, out_w, out_c))
for n in range(in_n):
for i in range(out_h):
for j in range(out_w):
for m in range(out_c):
out_tmp[n, i, j, m] = out[n, m, i, j]
return out_tmp.reshape(in_n, out_c, out_h, out_w)
class TestConv2dInt8Op(TestConv2dOp):
def setUp(self):
self.op_type = "conv2d"
self.use_cudnn = False
self.exhaustive_search = False
self.use_cuda = False
self.use_mkldnn = False
self.data_format = "AnyLayout"
self.weighttype = np.float32
self.use_mkldnn = True
self.init_group()
self.init_dilation()
self.init_test_case()
self.init_dtype()
conv2d_param = {
'stride': self.stride,
'pad': self.pad,
'dilation': self.dilations
}
filter = np.random.random(self.filter_size).astype(self.weighttype)
if self.srctype == np.uint8:
input = np.random.randint(0, 10,
self.input_size).astype(self.srctype)
else:
input = np.random.randint(-5, 5,
self.input_size).astype(self.srctype)
input_shift = (np.ones(self.input_size) * 128).astype(np.uint8)
if self.srctype == np.int8:
filter_int = np.round(filter * self.scale_weights[0] *
0.5).astype(np.int32)
scale_output_shift = self.scale_out / (self.scale_in *
self.scale_weights[0] * 0.5)
output1 = conv2d_forward_refer(
np.round((input.astype(np.int32) + input_shift) *
self.scale_in).astype(np.int32), filter_int,
self.groups,
conv2d_param).astype(np.float32) * scale_output_shift
output2 = conv2d_forward_refer(
np.round((input_shift) * self.scale_in).astype(np.int32),
filter_int, self.groups,
conv2d_param).astype(np.float32) * scale_output_shift
output = np.round(output1 - output2).astype(self.dsttype)
else:
filter_int = np.round(filter *
self.scale_weights[0]).astype(np.int32)
scale_output_shift = self.scale_out / (self.scale_in *
self.scale_weights[0])
output1 = conv2d_forward_refer(
input.astype(np.int32), filter_int, self.groups,
conv2d_param).astype(np.float32)
output = np.round(output1 * scale_output_shift).astype(self.dsttype)
self.inputs = {
'Input':
OpTest.np_dtype_to_fluid_dtype(input.astype(self.srctype)),
'Filter': OpTest.np_dtype_to_fluid_dtype(filter)
}
self.attrs = {
'strides': self.stride,
'paddings': self.pad,
'groups': self.groups,
'dilations': self.dilations,
'use_cudnn': self.use_cudnn,
'use_mkldnn': self.use_mkldnn,
'data_format': self.data_format,
'exhaustive_search': self.exhaustive_search,
'Scale_in': self.scale_in,
'Scale_out': self.scale_out,
'Scale_weights': self.scale_weights,
}
self.outputs = {'Output': output}
def test_check_output(self):
self.check_output_with_place(core.CPUPlace(), atol=0)
def test_check_grad(self):
pass
def test_check_grad_no_filter(self):
pass
def test_check_grad_no_input(self):
pass
def init_test_case(self):
TestConv2dOp.init_test_case(self)
f_c = self.input_size[1] // self.groups
self.filter_size = [1, f_c, 3, 3]
self.scale_in = 1.0
self.scale_out = 0.5
self.scale_weights = [10.0]
def init_dtype(self):
self.srctype = np.uint8
self.dsttype = np.int8
#--------------------test conv2d u8 in and s8 out--------------------
class TestConv2d(TestConv2dInt8Op):
def init_test_case(self):
self.pad = [0, 0]
self.stride = [1, 1]
self.input_size = [2, 3, 5, 5] # NCHW
assert np.mod(self.input_size[1], self.groups) == 0
f_c = self.input_size[1] // self.groups
self.filter_size = [6, f_c, 3, 3]
self.scale_in = 1.0
self.scale_out = 0.5
self.scale_weights = [10.0]
class TestWithPad(TestConv2d):
def init_test_case(self):
TestConv2d.init_test_case(self)
self.pad = [1, 1]
class TestWithGroup(TestConv2d):
def init_group(self):
self.groups = 3
class TestWithStride(TestConv2dInt8Op):
def init_test_case(self):
self.pad = [1, 1]
self.stride = [2, 2]
self.input_size = [2, 3, 6, 6]
assert np.mod(self.input_size[1], self.groups) == 0
f_c = self.input_size[1] // self.groups
self.filter_size = [6, f_c, 3, 3]
self.scale_in = 1.0
self.scale_out = 0.8
self.scale_weights = [10.0]
class TestWith1x1(TestConv2dInt8Op):
def init_test_case(self):
self.pad = [0, 0]
self.stride = [1, 1]
self.input_size = [1, 3, 5, 5]
assert np.mod(self.input_size[1], self.groups) == 0
f_c = self.input_size[1] // self.groups
self.filter_size = [6, f_c, 1, 1]
self.scale_in = 1.0
self.scale_out = 0.5
self.scale_weights = [12.0]
class TestWithInput1x1Filter1x1(TestConv2dInt8Op):
def init_test_case(self):
self.pad = [0, 0]
self.stride = [1, 1]
self.input_size = [2, 3, 1, 1]
assert np.mod(self.input_size[1], self.groups) == 0
f_c = self.input_size[1] // self.groups
self.filter_size = [6, f_c, 1, 1]
self.scale_in = 1.0
self.scale_out = 0.5
self.scale_weights = [10.0]
def init_group(self):
self.groups = 3
#--------------------test conv2d s8 in and s8 out--------------------
def create_test_int8_class(parent):
class TestInt8Case(parent):
def init_dtype(self):
self.srctype = np.int8
self.dsttype = np.int8
cls_name = "{0}_{1}".format(parent.__name__, "s8s8")
TestInt8Case.__name__ = cls_name
globals()[cls_name] = TestInt8Case
create_test_int8_class(TestConv2dInt8Op)
create_test_int8_class(TestWithPad)
create_test_int8_class(TestWithStride)
create_test_int8_class(TestWithGroup)
create_test_int8_class(TestWith1x1)
create_test_int8_class(TestWithInput1x1Filter1x1)
if __name__ == '__main__':
unittest.main()
......@@ -60,7 +60,7 @@ def conv2d_forward_naive(input, filter, group, conv_param):
np.sum(input_pad_masked * f_sub[k, :, :, :],
axis=(1, 2, 3))
return out
return out, in_n, out_h, out_w, out_c
class TestConv2dOp(OpTest):
......@@ -85,8 +85,9 @@ class TestConv2dOp(OpTest):
input = np.random.random(self.input_size).astype(self.dtype)
filter = np.random.random(self.filter_size).astype(self.dtype)
output = conv2d_forward_naive(input, filter, self.groups,
conv2d_param).astype(self.dtype)
output, _, _, _, _ = conv2d_forward_naive(input, filter, self.groups,
conv2d_param)
output = output.astype(self.dtype)
self.inputs = {
'Input': OpTest.np_dtype_to_fluid_dtype(input),
......
......@@ -442,10 +442,10 @@ class TestDistBase(unittest.TestCase):
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method nccl2 --lr %f"
tr0_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
0, w0_ep, self._lr / 2)
0, w0_ep, self._lr)
tr1_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
1, w1_ep, self._lr / 2)
1, w1_ep, self._lr)
if self._mem_opt:
tr0_cmd += " --mem_opt"
......
......@@ -14,14 +14,15 @@
from __future__ import print_function
import traceback
import math
import collections
import six
import unittest
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import traceback
import collections
import six
class TranspilerTest(unittest.TestCase):
......@@ -520,7 +521,7 @@ class TestLocalLookupTable(TestDistLookupTableBase):
'split_selected_rows', 'send', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_selected_rows', 'send', 'send_barrier', 'recv',
'recv', 'recv', 'recv', 'fetch_barrier', 'concat', 'concat'
'recv', 'fetch_barrier'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
......@@ -560,7 +561,7 @@ class TestDistLookupTable(TestDistLookupTableBase):
'lookup_table_grad', 'split_selected_rows', 'send',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_ids', 'send', 'send_barrier',
'recv', 'recv', 'recv', 'fetch_barrier', 'concat'
'recv', 'recv', 'fetch_barrier'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
startup_ops = [
......@@ -607,8 +608,7 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad',
'split_selected_rows', 'send', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_selected_rows', 'send', 'recv', 'recv', 'recv',
'recv', 'concat', 'concat'
'sum', 'split_selected_rows', 'send', 'recv', 'recv'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
......@@ -648,8 +648,7 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase):
'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad',
'lookup_table_grad', 'split_selected_rows', 'send',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_ids', 'send', 'recv', 'recv',
'recv', 'concat'
'lookup_table_grad', 'sum', 'split_ids', 'send', 'recv', 'recv'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
startup_ops = [
......@@ -824,5 +823,142 @@ class TestRemoteLookupTable(TestDistLookupTableBase):
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
# test for remote prefetch
class TestRemoteNce(TestDistLookupTableBase):
def network_with_table(self, is_sparse, is_distributed):
num_total_classes = 20
sampler = "uniform"
nid_freq_arr = np.random.dirichlet(np.ones(20) * 1000).astype('float32')
input = fluid.layers.data(name="input", shape=[10], dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
w_param = fluid.default_main_program().global_block().create_parameter(
shape=[num_total_classes, 10],
dtype='float32',
name='nce_w',
initializer=fluid.initializer.ConstantInitializer())
b_param = fluid.default_main_program().global_block().create_parameter(
shape=[num_total_classes, 1],
dtype='float32',
name='nce_b',
initializer=fluid.initializer.ConstantInitializer())
cost = fluid.layers.nce(input=input,
label=label,
num_total_classes=num_total_classes,
sampler=sampler,
custom_dist=nid_freq_arr.tolist(),
sample_weight=None,
param_attr='nce_w',
bias_attr='nce_b',
seed=1,
num_neg_samples=5,
is_sparse=is_sparse)
avg_cost = fluid.layers.mean(cost)
# optimizer
optimizer = fluid.optimizer.Adam(learning_rate=0.003)
optimizer.minimize(avg_cost)
def net_conf(self):
import os
os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1"
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
trainer, _ = self.get_trainer()
out_vars = ["nce_w"]
in_vars = ["nce_b"]
recv_var_names = []
for op in trainer.blocks[0].ops:
if op.type == "recv":
for var in op.output("Out"):
recv_var_names.append(var)
for out_var in out_vars:
self.assertFalse(out_var in recv_var_names)
for in_var in in_vars:
self.assertTrue(in_var in recv_var_names)
# test for remote prefetch
class TestRemoteHsigmoid(TestDistLookupTableBase):
def network_with_table(self, is_sparse, is_distributed):
num_total_classes = 3
input = fluid.layers.data(name="input", shape=[1], dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
path_table = fluid.layers.data(
name='path_table', shape=[3], dtype='int64')
path_code = fluid.layers.data(
name='path_code', shape=[3], dtype='int64')
w_param = fluid.default_main_program().global_block().create_parameter(
shape=[num_total_classes, 10],
dtype='float32',
name='hs_w',
initializer=fluid.initializer.ConstantInitializer())
b_param = fluid.default_main_program().global_block().create_parameter(
shape=[3, 1],
dtype='float32',
name='hs_b',
initializer=fluid.initializer.ConstantInitializer())
emb = fluid.layers.embedding(
input=input,
is_sparse=is_sparse,
size=[3, 3],
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(num_total_classes))))
cost = fluid.layers.hsigmoid(
input=emb,
label=label,
num_classes=num_total_classes,
path_table=path_table,
path_code=path_code,
is_custom=True,
is_sparse=is_sparse)
avg_cost = fluid.layers.mean(cost)
# optimizer
optimizer = fluid.optimizer.SGD(learning_rate=0.003)
optimizer.minimize(avg_cost)
def net_conf(self):
import os
os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1"
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
trainer, _ = self.get_trainer()
params_to_check = list()
for op in trainer.blocks[0].ops:
if op.type == "hierarchical_sigmoid":
params_to_check = [op.input("W")[0], op.input("Bias")[0]]
for name in ["epmap", "table_names", "epmap"]:
assert op.has_attr(name)
if name == "epmap":
assert op.attr(name)[0] == u'127.0.0.1:6174'
elif name == "table_names":
assert op.attr(name)[0] == u'hierarchical_sigmoid_0.w_0'
else:
assert op.attr(name) == 3
elif op.type == "lookup_table":
params_to_check.append(op.input("W")[0])
else:
pass
op_count = 0
for op in trainer.blocks[0].ops:
if op.type == "recv":
assert len(op.output("Out")) == 1
assert op.output("Out")[0] == u'hierarchical_sigmoid_0.b_0'
op_count += 1
assert op_count == 1
if __name__ == "__main__":
unittest.main()
......@@ -29,6 +29,12 @@ def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2):
print('Skip use_cuda=True because Paddle is not compiled with cuda')
return
if use_parallel_executor and os.name == 'nt':
print(
'Skip use_parallel_executor=True because Paddle comes without parallel support on windows'
)
return
word_dict = paddle.dataset.imdb.word_dict()
train_reader = paddle.batch(
paddle.dataset.imdb.train(word_dict), batch_size=batch_size)
......
......@@ -185,7 +185,7 @@ class TestHSigmoidOpSparse(OpTest):
self.inputs = {
'X': x,
'W': w,
'PTable': path_table,
'PathTable': path_table,
'PathCode': path_code,
'Label': label,
'Bias': bias
......@@ -287,7 +287,7 @@ class TestHSigmoidOpWithCostumTree(OpTest):
self.inputs = {
'X': x,
'W': w,
'PTable': path_table,
'PathTable': path_table,
'PathCode': path_code,
'Label': label,
'Bias': bias
......@@ -324,7 +324,7 @@ class TestHSigmoidOpWithCostumTreeWithoutBias(OpTest):
self.inputs = {
'X': x,
'W': w,
'PTable': path_table,
'PathTable': path_table,
'PathCode': path_code,
'Label': label,
}
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import signal
import time
import unittest
from multiprocessing import Process
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid.op import Operator
from paddle.fluid.framework import Program, program_guard
def run_pserver(pserver_id, use_cuda, sync_mode):
scope = fluid.core.Scope()
program = Program()
with fluid.scope_guard(scope):
with program_guard(program, startup_program=Program()):
# create table parameter in scope
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
# create and initialize Param Variable
param = scope.var('table').get_tensor()
param_array = np.ones((5, 8)).astype("float32")
for i in range(len(param_array)):
param_array[i] *= param_array[i] * i + pserver_id * 10 + 1
param.set(param_array, place)
optimize_block = program._create_block(program.global_block().idx)
program.global_block().append_op(
type="listen_and_serv",
inputs={'X': []},
outputs={},
attrs={
"optimize_blocks": [optimize_block],
"endpoint": '127.0.0.1:0',
"Fanin": 1,
"sync_mode": True,
"grad_to_block_id": []
})
exe = fluid.Executor(place)
exe.run(program)
class TestListenAndServOp(unittest.TestCase):
def setUp(self):
self.ps_timeout = 5
def _start_pserver(self, pserver_id, use_cuda, sync_mode, pserver_func):
p = Process(target=pserver_func, args=(pserver_id, use_cuda, sync_mode))
p.daemon = True
p.start()
return p
def _wait_ps_ready(self, pid):
start_left_time = self.ps_timeout
sleep_time = 0.5
while True:
assert start_left_time >= 0, "wait ps ready failed"
time.sleep(sleep_time)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
start_left_time -= sleep_time
def _get_pserver_port(self, pid):
with open("/tmp/paddle.%d.port" % pid, 'r') as f:
port = int(f.read().strip())
return port
def _run_hsigmoid_op_one_pserver(self, place, port):
scope = fluid.core.Scope()
program = Program()
with fluid.scope_guard(scope):
with program_guard(program, startup_program=Program()):
x = scope.var('X').get_tensor()
x_array = np.random.random((4, 8)).astype("float32") * 2
x.set(x_array, place)
# create and initialize Param Variable
param = scope.var('W').get_tensor()
param_array = np.zeros((5, 8)).astype("float32") * 2
param.set(param_array, place)
path_table = scope.var('PathTable').get_tensor()
path_table_array = np.array(
[(0, 2, -1, -1, -1), (0, 1, 2, -1, -1), (0, 1, 4, -1, -1),
(0, 2, -1, -1, -1)]).astype(
"int64"
) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf)
path_table.set(path_table_array, place)
path_code = scope.var('PathCode').get_tensor()
path_code_array = np.array(
[(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1),
(0, 1, -1, -1, -1)]).astype("int64") #np.array to store
path_code.set(path_code_array, place)
label = scope.var('Label').get_tensor()
label_array = np.array([0, 1, 4, 5])
label.set(label_array, place)
bias = scope.var('Bias').get_tensor()
bias_array = np.random.random((5, 1)).astype("float32")
bias.set(bias_array, place)
out = scope.var('Out').get_tensor()
pre_out = scope.var('PreOut').get_tensor
w_out = scope.var('W_Out').get_tensor()
w_out.set(param_array, place)
emaps = ['127.0.0.1:' + str(port)]
table_names = ['table']
height_sections = [2]
# create and run sgd operator
hsigmoid_op = Operator(
"hierarchical_sigmoid",
X='X',
W='W',
PathTable='PathTable',
PathCode='PathCode',
Label='Label',
Bias='Bias',
Out='Out',
PreOut='PreOut',
W_Out='W_Out',
remote_prefetch=True,
epmap=emaps,
table_names=table_names,
height_sections=height_sections)
hsigmoid_op.run(scope, place)
# get and compare result
result_array = np.array(w_out)
self.assertEqual(list(result_array.shape), [5, 8])
correct = None
for i in range(5):
if i != 3:
correct = np.full((1, 8), i + 1).astype("float32")
self.assertTrue((result_array[i] == correct).all())
else:
correct = np.full((1, 8), 0).astype("float32")
self.assertTrue((result_array[i] == correct).all())
def _run_hsigmoid_op_two_pserver(self, place, port0, port1):
scope = fluid.core.Scope()
program = Program()
with fluid.scope_guard(scope):
with program_guard(program, startup_program=Program()):
x = scope.var('X').get_tensor()
x_array = np.random.random((4, 8)).astype("float32") * 2
x.set(x_array, place)
# create and initialize Param Variable
param = scope.var('W').get_tensor()
param_array = np.zeros((5, 8)).astype("float32") * 2
param.set(param_array, place)
path_table = scope.var('PathTable').get_tensor()
path_table_array = np.array(
[(0, 2, -1, -1, -1), (0, 1, 3, -1, -1), (0, 1, 4, -1, -1),
(0, 2, -1, -1, -1)]).astype(
"int64"
) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf)
path_table.set(path_table_array, place)
path_code = scope.var('PathCode').get_tensor()
path_code_array = np.array(
[(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1),
(0, 1, -1, -1, -1)]).astype("int64") #np.array to store
path_code.set(path_code_array, place)
label = scope.var('Label').get_tensor()
label_array = np.array([0, 1, 4, 5])
label.set(label_array, place)
bias = scope.var('Bias').get_tensor()
bias_array = np.random.random((5, 1)).astype("float32")
bias.set(bias_array, place)
out = scope.var('Out').get_tensor()
pre_out = scope.var('PreOut').get_tensor
w_out = scope.var('W_Out').get_tensor()
w_out.set(param_array, place)
emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)]
table_names = ['table', 'table']
height_sections = [2, 3]
# create and run sgd operator
hsigmoid_op = Operator(
"hierarchical_sigmoid",
X='X',
W='W',
PathTable='PathTable',
PathCode='PathCode',
Label='Label',
Bias='Bias',
Out='Out',
PreOut='PreOut',
W_Out='W_Out',
remote_prefetch=True,
epmap=emaps,
table_names=table_names,
height_sections=height_sections)
hsigmoid_op.run(scope, place)
# get and compare result
result_array = np.array(w_out)
self.assertEqual(list(result_array.shape), [5, 8])
correct = None
for i in range(5):
if i < 2:
correct = np.full((1, 8), i + 1).astype("float32")
self.assertTrue((result_array[i] == correct).all())
else:
correct = np.full((1, 8), i + 9).astype("float32")
self.assertTrue((result_array[i] == correct).all())
def test_hsigmoid_op_remote(self):
os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1"
# run pserver on CPU in sync mode
p0 = self._start_pserver(0, False, True, run_pserver)
self._wait_ps_ready(p0.pid)
port0 = self._get_pserver_port(p0.pid)
p1 = self._start_pserver(1, False, True, run_pserver)
self._wait_ps_ready(p1.pid)
port1 = self._get_pserver_port(p1.pid)
places = [core.CPUPlace()]
for place in places:
self._run_hsigmoid_op_one_pserver(place, port0)
self._run_hsigmoid_op_two_pserver(place, port0, port1)
# raise SIGTERM to pserver
os.kill(p0.pid, signal.SIGINT)
p0.join()
os.kill(p1.pid, signal.SIGINT)
p1.join()
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import signal
import time
import unittest
from multiprocessing import Process
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid.op import Operator
from paddle.fluid.framework import Program, program_guard
def nce(input, weight, bias, sample_weight, labels, num_classes,
num_sample_class):
samples = []
sample_labels = []
batch_size = input.shape[0]
num_true_class = labels.shape[1]
for i in range(batch_size):
w = 1 if sample_weight is None else sample_weight[i]
for label in labels[i]:
samples.append((i, label, True, w))
sample_labels.append(label)
for num in range(num_sample_class):
samples.append((i, num, False, w))
sample_labels.append(num)
# forward bias
sample_out = np.zeros(len(samples)).astype(np.float32)
if bias is not None:
for i in range(len(samples)):
sample_out[i] = bias[samples[i][1]]
# forward weight
for i in range(len(samples)):
sample_out[i] += np.dot(input[samples[i][0]], weight[samples[i][1]])
# forward activation
sample_out = 1.0 / (1.0 + np.exp(-sample_out))
# forward cost
out = np.zeros(batch_size).astype(np.float32)
b = 1.0 / num_classes * num_sample_class
for i in range(len(samples)):
o = sample_out[i]
cost = -np.log(o / (o + b)) if samples[i][2] else -np.log(b / (o + b))
out[samples[i][0]] += cost * samples[i][3]
return (out[:, np.newaxis], np.array(sample_out).reshape(
batch_size, num_sample_class + num_true_class),
np.array(sample_labels).reshape(batch_size,
num_sample_class + num_true_class))
def run_pserver(pserver_id, use_cuda, sync_mode):
scope = fluid.core.Scope()
program = Program()
with fluid.scope_guard(scope):
with program_guard(program, startup_program=Program()):
# create table parameter in scope
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
# create and initialize Param Variable
param = scope.var('table').get_tensor()
param_array = np.ones((5, 8)).astype("float32")
for i in range(len(param_array)):
param_array[i] *= param_array[i] * i + pserver_id * 10 + 1
param.set(param_array, place)
optimize_block = program._create_block(program.global_block().idx)
program.global_block().append_op(
type="listen_and_serv",
inputs={'X': []},
outputs={},
attrs={
"optimize_blocks": [optimize_block],
"endpoint": '127.0.0.1:0',
"Fanin": 1,
"sync_mode": True,
"grad_to_block_id": []
})
exe = fluid.Executor(place)
exe.run(program)
class TestListenAndServOp(unittest.TestCase):
def setUp(self):
self.ps_timeout = 5
def _start_pserver(self, pserver_id, use_cuda, sync_mode, pserver_func):
p = Process(target=pserver_func, args=(pserver_id, use_cuda, sync_mode))
p.daemon = True
p.start()
return p
def _wait_ps_ready(self, pid):
start_left_time = self.ps_timeout
sleep_time = 0.5
while True:
assert start_left_time >= 0, "wait ps ready failed"
time.sleep(sleep_time)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
start_left_time -= sleep_time
def _get_pserver_port(self, pid):
with open("/tmp/paddle.%d.port" % pid, 'r') as f:
port = int(f.read().strip())
return port
def _run_nce_op_two_pserver(self, place, port0, port1):
scope = fluid.core.Scope()
program = Program()
with fluid.scope_guard(scope):
with program_guard(program, startup_program=Program()):
x = scope.var('Input').get_tensor()
x_array = np.random.random((4, 8)).astype("float32")
x.set(x_array, place)
# create and initialize Param Variable
param = scope.var('Weight').get_tensor()
param_array = np.zeros((5, 8)).astype("float32")
param.set(param_array, place)
bias = scope.var('Bias').get_tensor()
bias_array = np.random.random((5, 1)).astype("float32")
bias.set(bias_array, place)
sample_w = scope.var('SampleWeight').get_tensor()
sample_weight = np.random.random((4, 1)).astype("float32")
sample_w.set(sample_weight, place)
label = scope.var('Label').get_tensor()
label_array = np.array([[0], [1], [4], [3]])
label.set(label_array, place)
cost = scope.var('Cost').get_tensor()
cost_w = np.zeros((4, 1)).astype("float32")
cost.set(cost_w, place)
sample_l = scope.var('SampleLogits').get_tensor()
sample_l_w = np.zeros((4, 3)).astype("float32")
sample_l.set(sample_l_w, place)
sample_la = scope.var('SampleLabels').get_tensor()
sample_la_w = np.zeros((4, 3)).astype("int")
sample_la.set(sample_la_w, place)
emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)]
table_names = ['table', 'table']
height_sections = [2, 3]
# create and run nce operator
nce_op = Operator(
"nce",
Input='Input',
Weight='Weight',
Label='Label',
Bias='Bias',
Cost='Cost',
SampleLogits='SampleLogits',
SampleLabels='SampleLabels',
SampleWeight='SampleWeight',
num_total_classes=5,
num_neg_samples=2,
custom_neg_classes=list(range(2)),
sampler=0,
seed=0,
is_sparse=True,
remote_prefetch=True,
epmap=emaps,
table_names=table_names,
height_sections=height_sections)
nce_op.run(scope, place)
# get and compare result
o_cost = np.array(scope.var('Cost').get_tensor())
o_logits = np.array(scope.var('SampleLogits').get_tensor())
o_labels = np.array(scope.var('SampleLabels').get_tensor())
param_array = np.ones((5, 8)).astype("float32")
for i in range(2):
param_array[i] *= param_array[i] * i + 0 * 10 + 1
for i in range(2, 5):
param_array[i] *= param_array[i] * i + 1 * 10 + 1
out = nce(x_array, param_array, bias_array, sample_weight,
label_array, 5, 2)
self.assertAlmostEqual(o_cost.all(), out[0].all(), delta=1e-6)
self.assertAlmostEqual(o_logits.all(), out[1].all(), delta=1e-6)
self.assertAlmostEqual(o_labels.all(), out[2].all(), delta=1e-6)
def test_nce_op_remote(self):
os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1"
# run pserver on CPU in sync mode
p0 = self._start_pserver(0, False, True, run_pserver)
self._wait_ps_ready(p0.pid)
port0 = self._get_pserver_port(p0.pid)
p1 = self._start_pserver(1, False, True, run_pserver)
self._wait_ps_ready(p1.pid)
port1 = self._get_pserver_port(p1.pid)
places = [core.CPUPlace()]
for place in places:
self._run_nce_op_two_pserver(place, port0, port1)
# raise SIGTERM to pserver
os.kill(p0.pid, signal.SIGINT)
p0.join()
os.kill(p1.pid, signal.SIGINT)
p1.join()
if __name__ == '__main__':
unittest.main()
......@@ -175,41 +175,61 @@ class TestCRFModel(unittest.TestCase):
print(pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name])[0])
def test_update_sparse_parameter_all_reduce(self):
def _new_build_strategy(self, use_reduce=False):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
if use_reduce:
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
else:
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
return build_strategy
def test_update_sparse_parameter_all_reduce(self):
if core.is_compiled_with_cuda():
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
is_sparse=True,
build_strategy=self._new_build_strategy(),
use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
is_sparse=True,
build_strategy=self._new_build_strategy(),
use_cuda=False)
def test_update_dense_parameter_all_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
if core.is_compiled_with_cuda():
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
is_sparse=False,
build_strategy=self._new_build_strategy(),
use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
is_sparse=False,
build_strategy=self._new_build_strategy(),
use_cuda=False)
def test_update_sparse_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
if core.is_compiled_with_cuda():
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
is_sparse=True,
build_strategy=self._new_build_strategy(use_reduce=True),
use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
is_sparse=True,
build_strategy=self._new_build_strategy(use_reduce=True),
use_cuda=False)
def test_update_dense_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
if core.is_compiled_with_cuda():
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
is_sparse=False,
build_strategy=self._new_build_strategy(use_reduce=True),
use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
is_sparse=False,
build_strategy=self._new_build_strategy(use_reduce=True),
use_cuda=False)
if __name__ == '__main__':
......
......@@ -86,6 +86,7 @@ class TestMNIST(TestParallelExecutorBase):
"label": label},
use_cuda=use_cuda,
use_reduce=False)
reduce_first_loss, reduce_last_loss = self.check_network_convergence(
model,
feed_dict={"image": img,
......
......@@ -251,11 +251,10 @@ class DistributeTranspiler(object):
def _get_all_remote_sparse_update_op(self, main_program):
sparse_update_ops = []
sparse_update_op_types = ["lookup_table"]
sparse_update_op_types = ["lookup_table", "nce", "hierarchical_sigmoid"]
for op in main_program.global_block().ops:
if op.type in sparse_update_op_types and op.attr(
'remote_prefetch') is True and not op.attr(
'is_distributed'):
'remote_prefetch') is True:
sparse_update_ops.append(op)
return sparse_update_ops
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册