diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index aa7b8e8db59a58f6131a79474c31964a0b950f0c..0c1e02368ec9b7f29754438bf52097ebb0bd2edd 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -199,6 +199,7 @@ paddle.fluid.layers.merge_selected_rows ArgSpec(args=['x', 'name'], varargs=None paddle.fluid.layers.get_tensor_from_selected_rows ArgSpec(args=['x', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.lstm ArgSpec(args=['input', 'init_h', 'init_c', 'max_len', 'hidden_size', 'num_layers', 'dropout_prob', 'is_bidirec', 'is_test', 'name', 'default_initializer', 'seed'], varargs=None, keywords=None, defaults=(0.0, False, False, None, None, -1)) paddle.fluid.layers.py_func ArgSpec(args=['func', 'x', 'out', 'backward_func', 'skip_vars_in_backward_input'], varargs=None, keywords=None, defaults=(None, None)) +paddle.fluid.layers.psroi_pool ArgSpec(args=['input', 'rois', 'output_channels', 'spatial_scale', 'pooled_height', 'pooled_width', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.data ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)) paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)) paddle.fluid.layers.read_file ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index ce429fefa77b81dff9bf997ba092e92d97cb0dc0..6d7a69c8c9e11016f641f73e296156da09114408 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -72,6 +72,8 @@ cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) +cc_library(garbage_collector SRCS garbage_collector.cc DEPS device_context memory) + cc_library(reader SRCS reader.cc DEPS lod_tensor ddim) cc_test(reader_test SRCS reader_test.cc DEPS reader) @@ -183,6 +185,8 @@ else() cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op) endif() +target_link_libraries(executor garbage_collector) + cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph build_strategy diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 2f76cb714fcf7297c17552bcff965c6039253a2e..a927a3afcddb52f571543462e485b682aac163ae 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -45,10 +45,10 @@ cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base s cc_library(modify_op_lock_and_record_event_pass SRCS modify_op_lock_and_record_event_pass.cc DEPS computation_op_handle op_graph_view multi_devices_helper) -if (WITH_GPU) - cc_library(reference_count_pass SRCS reference_count_pass.cc DEPS computation_op_handle scale_loss_grad_op_handle rpc_op_handle - all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle graph graph_helper pass) -endif() +cc_library(reference_count_pass_helper SRCS reference_count_pass_helper.cc DEPS garbage_collector computation_op_handle) +cc_library(eager_deletion_op_handle SRCS eager_deletion_op_handle.cc DEPS lod_tensor selected_rows reference_count_pass_helper) +cc_library(eager_deletion_pass SRCS eager_deletion_pass.cc DEPS computation_op_handle eager_deletion_op_handle graph graph_helper pass) +cc_library(reference_count_pass SRCS reference_count_pass.cc DEPS computation_op_handle graph graph_helper pass op_graph_view reference_count_pass_helper) cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_helper pass) @@ -56,10 +56,7 @@ cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_he cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle fused_broadcast_op_handle) -set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass) -if (WITH_GPU) - list(APPEND SSA_GRAPH_EXECUTOR_DEPS reference_count_pass) -endif() +set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass reference_count_pass eager_deletion_pass) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS}) diff --git a/paddle/fluid/framework/details/computation_op_handle.cc b/paddle/fluid/framework/details/computation_op_handle.cc index 7ad1e40c600c6e70cea822fac777ff20163078e6..7beb8c8de9fc49aebc66ca44de8736240aabbc30 100644 --- a/paddle/fluid/framework/details/computation_op_handle.cc +++ b/paddle/fluid/framework/details/computation_op_handle.cc @@ -20,11 +20,13 @@ namespace paddle { namespace framework { namespace details { ComputationOpHandle::ComputationOpHandle(ir::Node *node, Scope *scope, - platform::Place place) + platform::Place place, + size_t scope_idx) : OpHandleBase(node), op_(framework::OpRegistry::CreateOp(*node->Op())), scope_(scope), - place_(place) {} + place_(place), + scope_idx_(scope_idx) {} void ComputationOpHandle::RunImpl() { WaitInputVarGenerated(place_); diff --git a/paddle/fluid/framework/details/computation_op_handle.h b/paddle/fluid/framework/details/computation_op_handle.h index 662a91d6b4dfcfed563fdf2e46c22f83f90b40af..601ae4f8c6de11b0bf25d4f9a92ef8eada67be3d 100644 --- a/paddle/fluid/framework/details/computation_op_handle.h +++ b/paddle/fluid/framework/details/computation_op_handle.h @@ -28,7 +28,8 @@ namespace framework { namespace details { struct ComputationOpHandle : public OpHandleBase { public: - ComputationOpHandle(ir::Node *node, Scope *scope, platform::Place place); + ComputationOpHandle(ir::Node *node, Scope *scope, platform::Place place, + size_t scope_idx); std::string Name() const override; @@ -38,6 +39,8 @@ struct ComputationOpHandle : public OpHandleBase { void SetLockAndRecordEventFree(bool b) { is_lock_and_record_event_free_ = b; } + size_t GetScopeIdx() const { return scope_idx_; } + protected: void RunImpl() override; @@ -47,6 +50,7 @@ struct ComputationOpHandle : public OpHandleBase { std::unique_ptr op_; Scope *scope_; platform::Place place_; + size_t scope_idx_; bool is_lock_and_record_event_free_{false}; }; } // namespace details diff --git a/paddle/fluid/framework/details/eager_deletion_op_handle.cc b/paddle/fluid/framework/details/eager_deletion_op_handle.cc new file mode 100644 index 0000000000000000000000000000000000000000..abacb11e3b018308c20a67630e3ff34cca7d3387 --- /dev/null +++ b/paddle/fluid/framework/details/eager_deletion_op_handle.cc @@ -0,0 +1,122 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/eager_deletion_op_handle.h" +#include "paddle/fluid/framework/lod_tensor_array.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/cuda_device_guard.h" +#endif + +namespace paddle { +namespace framework { +namespace details { + +EagerDeletionOpHandle::EagerDeletionOpHandle( + ir::Node *node, const Scope *scope, const platform::Place &place, + const std::unordered_set &var_names, GarbageCollector *gc, + AtomicReferenceCountMap *ref_cnts) + : OpHandleBase(node), + scope_(scope), + var_names_(var_names), + gc_(gc), + ref_cnts_(ref_cnts) { +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place)) { + dev_ctx_ = reinterpret_cast( + platform::DeviceContextPool::Instance().Get(place)); + if (dynamic_cast(gc_)) { + platform::CUDADeviceGuard guard( + boost::get(place).device); + PADDLE_ENFORCE(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming)); + PADDLE_ENFORCE_NOT_NULL(event_); + } + } +#endif +} + +EagerDeletionOpHandle::~EagerDeletionOpHandle() { +#ifdef PADDLE_WITH_CUDA + if (event_) { + auto gpu_place = boost::get(dev_ctx_->GetPlace()); + platform::CUDADeviceGuard guard(gpu_place.device); + PADDLE_ENFORCE(cudaEventDestroy(event_)); + } +#endif +} + +std::string EagerDeletionOpHandle::Name() const { return "eager_deletion"; } + +void EagerDeletionOpHandle::RunImpl() { + auto *exec_scope = scope_->FindVar(kLocalExecScopeName)->Get(); + std::deque> garbages; + for (auto &name : var_names_) { + auto it = ref_cnts_->find(name); + // Var not found, not reference count has not decreased to 0 + if (it == ref_cnts_->end() || it->second.fetch_sub(1) != 1) { + continue; + } + + auto *var = exec_scope->FindVar(name); + if (var == nullptr) { + continue; + } + + VLOG(2) << "Erase variable " << name; + + if (var->IsType()) { + garbages.emplace_back(var->GetMutable()->MoveMemoryHolder()); + } else if (var->IsType()) { + garbages.emplace_back( + var->GetMutable()->mutable_value()->MoveMemoryHolder()); + } else if (var->IsType()) { + auto *tensor_arr = var->GetMutable(); + for (auto &t : *tensor_arr) { + garbages.emplace_back(t.MoveMemoryHolder()); + } + } else { + PADDLE_THROW("Type %s of %s is not supported eager deletion", + var->Type().name(), name); + } + } + + if (!garbages.empty()) { + ClearGarbages(&garbages); + } +} + +void EagerDeletionOpHandle::ClearGarbages( + std::deque> *garbages) { +#ifdef PADDLE_WITH_CUDA + if (event_) { + auto compute_stream = dev_ctx_->stream(); + auto callback_stream = + reinterpret_cast(gc_)->stream(); + auto callback_func = [=]() { + PADDLE_ENFORCE(cudaEventRecord(event_, compute_stream)); + PADDLE_ENFORCE(cudaStreamWaitEvent(callback_stream, event_, 0)); + }; + gc_->Add(std::move(*garbages), callback_func); + } else { +#endif + gc_->Add(std::move(*garbages)); +#ifdef PADDLE_WITH_CUDA + } +#endif +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/eager_deletion_op_handle.h b/paddle/fluid/framework/details/eager_deletion_op_handle.h new file mode 100644 index 0000000000000000000000000000000000000000..64867afad5b70a2ba31e5cb315daffcf433b5935 --- /dev/null +++ b/paddle/fluid/framework/details/eager_deletion_op_handle.h @@ -0,0 +1,58 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/details/reference_count_pass_helper.h" + +namespace paddle { +namespace framework { +class Scope; + +namespace details { + +class EagerDeletionOpHandle : public OpHandleBase { + public: + EagerDeletionOpHandle(ir::Node *node, const Scope *scope, + const platform::Place &place, + const std::unordered_set &var_names, + GarbageCollector *gc, + AtomicReferenceCountMap *ref_cnts); + + ~EagerDeletionOpHandle(); + + std::string Name() const override; + + protected: + void RunImpl() override; + + private: + void ClearGarbages(std::deque> *garbages); + + const Scope *scope_; + std::unordered_set var_names_; + GarbageCollector *gc_; // not own + AtomicReferenceCountMap *ref_cnts_; // not own +#ifdef PADDLE_WITH_CUDA + platform::CUDADeviceContext *dev_ctx_{nullptr}; + cudaEvent_t event_{nullptr}; +#endif +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/eager_deletion_pass.cc b/paddle/fluid/framework/details/eager_deletion_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..4e42d0b4972d567dd769cad6ff8b9d45380ab77a --- /dev/null +++ b/paddle/fluid/framework/details/eager_deletion_pass.cc @@ -0,0 +1,101 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "paddle/fluid/framework/details/computation_op_handle.h" +#include "paddle/fluid/framework/details/eager_deletion_op_handle.h" +#include "paddle/fluid/framework/details/eager_deletion_pass.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph_helper.h" + +namespace paddle { +namespace framework { +namespace details { + +std::unique_ptr EagerDeletionPass::ApplyImpl( + std::unique_ptr graph) const { + auto &ref_cnts = + Get>(kRuntimeReferenceCount); + PADDLE_ENFORCE(ref_cnts.empty(), + "kRuntimeReferenceCount should be initialized here!"); + + const auto &vars = graph->Get(kGraphVars); + ref_cnts.resize(vars.size()); + + const auto &last_live_ops = + Get>(kLastLiveOpsOfVars); + const auto &gcs = Get(kGarbageCollector); + const auto &places = Get>(kAllPlaces); + + // a reverse map of last_live_ops + // i.e., last op --> variable names which can be deleted. + std::unordered_map> + op_vars_map; + + for (auto &var_ops_map : last_live_ops) { + for (auto &var_ops_pair : var_ops_map) { + const std::string &var_name = var_ops_pair.first; + for (auto *op : var_ops_pair.second) { + op_vars_map[op].insert(var_name); + } + } + } + + for (auto &pair : op_vars_map) { + auto *op = pair.first; + auto &var_names = pair.second; + + auto *eager_deletion_node = + graph->CreateEmptyNode("eager_deletion", ir::Node::Type::kOperation); + auto *eager_deletion_op = new EagerDeletionOpHandle( + eager_deletion_node, op->GetScope(), op->GetPlace(), var_names, + gcs.at(places[op->GetScopeIdx()]).get(), + &(ref_cnts[op->GetScopeIdx()])); + + auto it = std::find_if( + op->Outputs().begin(), op->Outputs().end(), [](VarHandleBase *var) { + return dynamic_cast(var) != nullptr; + }); + + if (it != op->Outputs().end()) { + eager_deletion_op->AddInput(*it); + } else { + auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); + graph->Get(kGraphDepVars).emplace(dep_var); + op->AddOutput(dep_var); + eager_deletion_op->AddInput(dep_var); + } + + auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); + graph->Get(kGraphDepVars).emplace(dummy_leaf); + eager_deletion_op->AddOutput(dummy_leaf); + } + + VLOG(10) << "Create " << op_vars_map.size() << " EagerDeletionOpHandle(s)"; + return graph; +} + +} // namespace details +} // namespace framework +} // namespace paddle + +REGISTER_PASS(eager_deletion_pass, + paddle::framework::details::EagerDeletionPass) + .RequirePassAttr(paddle::framework::details::kRuntimeReferenceCount) + .RequirePassAttr(paddle::framework::details::kLastLiveOpsOfVars) + .RequirePassAttr(paddle::framework::details::kAllPlaces) + .RequirePassAttr(paddle::framework::details::kGarbageCollector); diff --git a/paddle/fluid/framework/details/eager_deletion_pass.h b/paddle/fluid/framework/details/eager_deletion_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..d7a7a9709d970841060778806451bc21cb2c7571 --- /dev/null +++ b/paddle/fluid/framework/details/eager_deletion_pass.h @@ -0,0 +1,32 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/pass.h" + +namespace paddle { +namespace framework { +namespace details { + +class EagerDeletionPass : public ir::Pass { + protected: + std::unique_ptr ApplyImpl( + std::unique_ptr graph) const override; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index cbae5321d9a3fdcaebca2bd9111fff6a10b235a8..8af1d62dea89343ff2d41dd7c6ac837459df7685 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -565,7 +565,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(ir::Graph *result, int dev_id) const { result->Get(kGraphOps).emplace_back( new ComputationOpHandle(result->CreateOpNode(node->Op()), - local_scopes_[dev_id], places_[dev_id])); + local_scopes_[dev_id], places_[dev_id], dev_id)); CreateOpHandleIOs(result, node, dev_id); } @@ -688,8 +688,8 @@ void MultiDevSSAGraphBuilder::CreateComputationalOps(ir::Graph *result, for (size_t scope_idx = 0; scope_idx < num_places; ++scope_idx) { auto p = places_[scope_idx]; auto s = local_scopes_[scope_idx]; - result->Get(kGraphOps).emplace_back( - new ComputationOpHandle(result->CreateOpNode(node->Op()), s, p)); + result->Get(kGraphOps).emplace_back(new ComputationOpHandle( + result->CreateOpNode(node->Op()), s, p, scope_idx)); CreateOpHandleIOs(result, node, scope_idx); } } diff --git a/paddle/fluid/framework/details/op_graph_view.cc b/paddle/fluid/framework/details/op_graph_view.cc index 4838c4198ff35ba3fb562f3a7c0563ee60179e3b..d3865c2c2919c2d43521e4f51013e5fa1b10416d 100644 --- a/paddle/fluid/framework/details/op_graph_view.cc +++ b/paddle/fluid/framework/details/op_graph_view.cc @@ -23,6 +23,8 @@ namespace details { OpGraphView::OpGraphView(const std::vector &ops) { Build(ops); } void OpGraphView::Build(const std::vector &ops) { + preceding_ops_.clear(); + pending_ops_.clear(); for (auto &op : ops) { preceding_ops_[op]; pending_ops_[op]; @@ -40,6 +42,7 @@ void OpGraphView::Build(const std::vector &ops) { std::unordered_set OpGraphView::AllOps() const { std::unordered_set ret; + ret.reserve(preceding_ops_.size()); for (auto &pair : preceding_ops_) { ret.insert(pair.first); } diff --git a/paddle/fluid/framework/details/op_graph_view.h b/paddle/fluid/framework/details/op_graph_view.h index afb3e8e59461eeba10d7027fc70b89cc170c1805..77aa02eba56acb3bb20a5c5a55c75af78a3c1c81 100644 --- a/paddle/fluid/framework/details/op_graph_view.h +++ b/paddle/fluid/framework/details/op_graph_view.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include @@ -34,6 +34,11 @@ class OpGraphView { bool HasOp(OpHandleBase *op) const; + // Use a visitor to visit all pending ops of op + // Stop when callback returns false + template + bool VisitAllPendingOps(OpHandleBase *op, Callback &&callback) const; + private: void Build(const std::vector &ops); void EnforceHasOp(OpHandleBase *op) const; @@ -44,6 +49,28 @@ class OpGraphView { pending_ops_; }; +template +bool OpGraphView::VisitAllPendingOps(OpHandleBase *op, + Callback &&callback) const { + EnforceHasOp(op); + std::unordered_set visited; + std::queue q; + q.push(op); + do { + op = q.front(); + q.pop(); + for (auto &pending_op : pending_ops_.at(op)) { + if (visited.count(pending_op) == 0) { + visited.insert(pending_op); + if (!callback(pending_op)) { + return false; + } + } + } + } while (!q.empty()); + return true; +} + } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/reference_count_op_handle.h b/paddle/fluid/framework/details/reference_count_op_handle.h deleted file mode 100644 index cc4ccfbdfc720284e683a8f3f59a4aa57a3a9eb1..0000000000000000000000000000000000000000 --- a/paddle/fluid/framework/details/reference_count_op_handle.h +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include "paddle/fluid/framework/details/op_handle_base.h" -#include "paddle/fluid/framework/garbage_collector.h" -#include "paddle/fluid/framework/scope.h" -#include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/framework/tensor.h" - -namespace paddle { -namespace framework { -namespace details { - -using ReferenceCountMap = std::unordered_map; -using AtomicReferenceCountMap = - std::unordered_map>; -using DeviceReferenceCountMap = - std::unordered_map>; -using AtomicDeviceReferenceCountMap = - std::unordered_map>; -using DeviceGarbageCollectorMap = - std::unordered_map>>; - -class ReferenceCountOpHandle : public OpHandleBase { - public: - ReferenceCountOpHandle(ir::Node *node, const Scope *scope, - const platform::CUDAPlace &place, - const std::vector &var_names, - GarbageCollector *gc, - AtomicReferenceCountMap *ref_cnts) - : OpHandleBase(node), scope_(scope), gc_(gc), ref_cnts_(ref_cnts) { - dev_ctx_ = static_cast( - platform::DeviceContextPool::Instance().Get(place)); - if (IsStreamGarabageCollector()) { - platform::SetDeviceId(place.device); - PADDLE_ENFORCE(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming)); - } - - for (auto &name : var_names) AddVar(name); - } - - ~ReferenceCountOpHandle() { - if (IsStreamGarabageCollector()) { - auto gpu_place = boost::get(dev_ctx_->GetPlace()); - platform::SetDeviceId(gpu_place.device); - PADDLE_ENFORCE(cudaEventDestroy(event_)); - } - } - - std::string Name() const override { return "reference_count"; } - - void AddVar(const std::string &name) { - auto it = var_names_.find(name); - if (it != var_names_.end()) - ++(it->second); - else - var_names_[name] = 1; - } - - protected: - void RunImpl() override { - auto *exec_scope = scope_->FindVar(kLocalExecScopeName)->Get(); - std::vector tensors; - for (auto &pair : var_names_) { - auto &name = pair.first; - auto it = ref_cnts_->find(name); - if (it == ref_cnts_->end()) continue; - - auto *var = exec_scope->FindVar(name); - if (var == nullptr) continue; - - if (var->IsType()) { - if (it->second.fetch_sub(pair.second) <= pair.second) { - tensors.emplace_back(var->GetMutable()); - } - } else if (var->IsType()) { - if (it->second.fetch_sub(pair.second) <= pair.second) { - tensors.emplace_back( - var->GetMutable()->mutable_value()); - } - } - } - - if (!tensors.empty()) { - ClearTensors(tensors); - } - } - - private: - void ClearTensors(const std::vector &tensors) { - auto *gc = dynamic_cast *>(gc_); - if (gc != nullptr) { - auto compute_stream = dev_ctx_->stream(); - auto callback_stream = gc->stream(); - auto callback_func = [=]() { - PADDLE_ENFORCE(cudaEventRecord(event_, compute_stream)); - PADDLE_ENFORCE(cudaStreamWaitEvent(callback_stream, event_, 0)); - }; - gc_->Add(tensors, callback_func); - } else { - gc_->Add(tensors); - } - } - - bool IsStreamGarabageCollector() const { - return dynamic_cast *>(gc_) != nullptr; - } - - const Scope *scope_; - platform::CUDADeviceContext *dev_ctx_; - std::unordered_map var_names_; - GarbageCollector *gc_; // not own - AtomicReferenceCountMap *ref_cnts_; // not own - cudaEvent_t event_; -}; - -} // namespace details -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/details/reference_count_pass.cc b/paddle/fluid/framework/details/reference_count_pass.cc index 08783fb5f8b18329c9167edb0dac39b7dd42a746..13a042d8e6ed7f18c76387b666d681df0eabd0b5 100644 --- a/paddle/fluid/framework/details/reference_count_pass.cc +++ b/paddle/fluid/framework/details/reference_count_pass.cc @@ -14,187 +14,240 @@ #include #include +#include #include #include "paddle/fluid/framework/details/computation_op_handle.h" +#include "paddle/fluid/framework/details/eager_deletion_op_handle.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/details/op_graph_view.h" #include "paddle/fluid/framework/details/reference_count_pass.h" +#include "paddle/fluid/framework/details/reference_count_pass_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h" namespace paddle { namespace framework { namespace details { -static ComputationOpHandle *FindNextComputationOpHandle(VarHandle *var_in) { - std::queue queue; - queue.push(var_in); - do { - auto *var = queue.front(); - queue.pop(); - for (auto *op : var->PendingOps()) { - auto *compute_op = dynamic_cast(op); - if (compute_op != nullptr && compute_op->GetPlace() == var_in->place_) { - return compute_op; +// A functor to shrink/remove operators who depend on other operators in a set +class ShrinkDepsOpFunctor { + private: + enum RelationShip { kSame = 0, kNoDeps = 1, kBefore = 2, kAfter = 3 }; + + public: + explicit ShrinkDepsOpFunctor(const std::vector &all_ops) + : graph_(all_ops) {} + + template + OpSet operator()(const OpSet &op_set) const { + using KeyType = typename OpSet::key_type; + static_assert( + std::is_base_of::type>::value, + "Key type of OpSet must be OpHandleBase, or derived of OpHandleBase"); + + if (op_set.size() <= 1) return op_set; + std::vector ops(op_set.begin(), op_set.end()); + OpSet ret; + auto rels = GetRelations(ops); + auto not_before = [](RelationShip r) { return r != kBefore; }; + for (size_t i = 0; i < rels.size(); ++i) { + if (std::all_of(rels[i].begin(), rels[i].end(), not_before)) { + ret.emplace(static_cast(ops[i])); } - for (auto *out_var : op->Outputs()) { - queue.push(out_var); + } + return ret; + } + + private: + std::vector> GetRelations( + const std::vector &ops) const { + std::unordered_map op_to_idx; + for (size_t i = 0; i < ops.size(); ++i) { + PADDLE_ENFORCE(graph_.HasOp(ops[i]), "Op does not exist in graph"); + op_to_idx[ops[i]] = i; + } + + PADDLE_ENFORCE(op_to_idx.size() == ops.size(), "Duplicate ops"); + + std::vector> ret(ops.size()); + for (auto &e : ret) { + e.assign(ops.size(), kSame); + } + + size_t found_num = ops.size(); + size_t total_num = ops.size() * ops.size(); + auto visitor = [&](OpHandleBase *op, size_t i) { + auto it = op_to_idx.find(op); + if (it != op_to_idx.end()) { + size_t j = it->second; + if (i != j && ret[i][j] == kSame) { + ret[i][j] = kBefore; + ret[j][i] = kAfter; + found_num += 2; + if (found_num == total_num) { + return false; + } + } + } + return true; + }; + + for (size_t i = 0; i < ops.size(); ++i) { + auto sub_visitor = [&, i](OpHandleBase *op) { return visitor(op, i); }; + if (!graph_.VisitAllPendingOps(ops[i], sub_visitor)) { + break; + } + } + + for (size_t i = 0; i < ops.size(); ++i) { + for (size_t j = i + 1; j < ops.size(); ++j) { + if (ret[i][j] != kSame) continue; + ret[i][j] = kNoDeps; + ret[j][i] = kNoDeps; + } + } + + return ret; + } + + const OpGraphView graph_; +}; + +/** + * Find the nearest downstream computation op handle. If the op is a + * computation op, just return itself. + */ +static ComputationOpHandle *FindNextComputationOpHandleOrReturnItself( + OpHandleBase *op, size_t scope_idx) { + std::queue q; + std::unordered_set visited; + q.push(op); + do { + auto *op = q.front(); + q.pop(); + auto *compute_op = dynamic_cast(op); + if (compute_op != nullptr && compute_op->GetScopeIdx() == scope_idx) { + return compute_op; + } + for (auto *out_var : op->Outputs()) { + for (auto *pending_op : out_var->PendingOps()) { + if (visited.count(pending_op)) continue; + visited.insert(pending_op); } } - } while (!queue.empty()); + } while (!q.empty()); return nullptr; } -static void AddDependencyBetween(OpHandleBase *in, OpHandleBase *out, - ir::Graph *graph) { - auto it = std::find_if( - in->Outputs().begin(), in->Outputs().end(), [](VarHandleBase *var) { - return dynamic_cast(var) != nullptr; - }); - - if (it != in->Outputs().end()) { - out->AddInput(*it); - } else { - auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); - graph->Get(kGraphDepVars).emplace(dep_var); - in->AddOutput(dep_var); - out->AddInput(dep_var); +static std::unordered_set +ExtractComputationOpFromLastLivedVar(VarHandle *var, size_t scope_idx, + const ShrinkDepsOpFunctor &shrink_func, + bool *ok) { + // stage one. Get last op for variable. + std::unordered_set candidates; + { + if (var->PendingOps().empty() && var->GeneratedOp()) { + // No operator depends on this variable. So the last operator is the op + // who generates this variable. + candidates.emplace(var->GeneratedOp()); + } else { + candidates = var->PendingOps(); + } + + // No pending ops or generated op is nullptr + if (candidates.empty()) { + *ok = false; + return {}; + } + } + + // stage two. Try to cast them to computation op. + // return (*ok=false) when failed. + // + // The reason why we cannot make any types of op handle to be the last lived + // op is: + // some op handle may operate on many DeviceContext, however, our garbage + // collector can only wait one DeviceContext for now. So currently, we wait + // the nearest compute op. + std::unordered_set computation_op; + { + for (auto *op : candidates) { + auto *compute_op = + FindNextComputationOpHandleOrReturnItself(op, scope_idx); + if (compute_op == nullptr) { + *ok = false; + return {}; + } + computation_op.emplace(compute_op); + } } + + // stage three. Try to shrink computation op if they depend on each other. + // Get the smallest set of the most ops. + *ok = true; + return shrink_func(computation_op); +} + +static VarDesc *TryGetLatestVarDesc(const std::vector &vars) { + VarDesc *var_desc = nullptr; + std::find_if(vars.rbegin(), vars.rend(), [&](VarHandle *var_handle) -> bool { + var_desc = var_handle->Node()->Var(); + return var_desc != nullptr; + }); + return var_desc; } std::unique_ptr ReferenceCountPass::ApplyImpl( std::unique_ptr graph) const { - auto &ref_cnts = Get(kGlobalReferenceCount); - auto &cur_ref_cnts = Get(kCurReferenceCount); - auto &gcs = Get(kGarbageCollector); - - // It is not easy to find the right reference counts of varaibles in graph - // Step 1: Find all variables in computation ops - // Step 2: Find all variables in non-computation ops which refers to variables - // in computation ops - std::unordered_set names; - std::unordered_map - compute_ref_cnt_map; - - auto get_ref_cnts_from_compute_op = [&]( - OpHandleBase *op, const std::vector &vars) { - std::vector var_names_in_op; - auto *compute_op = dynamic_cast(op); - if (compute_op == nullptr || - !platform::is_gpu_place(compute_op->GetPlace())) - return var_names_in_op; - auto place = boost::get(compute_op->GetPlace()); - for (VarHandleBase *var_handle_base : vars) { - auto *var_handle = dynamic_cast(var_handle_base); - if (var_handle == nullptr || !var_handle->Node()->IsVar()) continue; - - if (!platform::is_gpu_place(var_handle->place_) || - boost::get(var_handle->place_) != place) - continue; + auto &ref_cnts = Get>(kGlobalReferenceCount); + auto &last_live_ops_of_vars = + Get>(kLastLiveOpsOfVars); + + PADDLE_ENFORCE(last_live_ops_of_vars.empty() && ref_cnts.empty(), + "Last Live Ops and Reference Counts of vars should be " + "initialized at here."); - VarDesc *var_desc = var_handle->Node()->Var(); - auto var_name = var_handle->Node()->Name(); + const auto &vars = graph->Get(kGraphVars); - // This is weird but there is really some variables without var_desc - // in computation_op - if (var_desc == nullptr) { - var_desc = compute_op->Node()->Op()->Block()->FindVar(var_name); - if (var_desc == nullptr) continue; + last_live_ops_of_vars.resize(vars.size()); + ref_cnts.resize(vars.size()); + + ShrinkDepsOpFunctor shrink_func( + ir::FilterByNodeWrapper(*graph)); + + for (size_t i = 0; i < vars.size(); ++i) { + for (auto &name_var_pair : vars[i]) { + // Whether this variable can be reused or deleted? If not, we do not + // compute reference counts and dependencies. + VarDesc *var_desc = TryGetLatestVarDesc(name_var_pair.second); + + if (var_desc == nullptr || var_desc->Persistable()) { + continue; } - if (var_desc->Persistable()) continue; auto var_type = var_desc->Proto()->type().type(); if (var_type != proto::VarType::LOD_TENSOR && - var_type != proto::VarType::SELECTED_ROWS) { + var_type != proto::VarType::SELECTED_ROWS && + var_type != proto::VarType::LOD_TENSOR_ARRAY) { + // Var type cannot be deleted continue; } - // compute op only runs in one device - if (ref_cnts[place.device]->count(var_name)) - ++(*ref_cnts[place.device])[var_name]; - else - (*ref_cnts[place.device])[var_name] = 1; + bool ok; + auto result = ExtractComputationOpFromLastLivedVar( + name_var_pair.second.back(), i, shrink_func, &ok); - names.insert(var_name); - var_names_in_op.push_back(var_name); - } - return var_names_in_op; - }; - - auto update_ref_cnts_from_non_compute_op = [&]( - OpHandleBase *op, const std::vector &vars) { - if (dynamic_cast(op) != nullptr) return; - for (VarHandleBase *var_handle_base : vars) { - auto *var_handle = dynamic_cast(var_handle_base); - if (var_handle == nullptr || !var_handle->Node()->IsVar()) continue; - - auto var_name = var_handle->Node()->Name(); - auto var_place = var_handle->place_; - if (!platform::is_gpu_place(var_place)) continue; - auto place = boost::get(var_place); - if (names.count(var_name) == 0) continue; - if (ref_cnts.count(place.device) && - ref_cnts[place.device]->count(var_name)) { - ++(*ref_cnts[place.device])[var_name]; - - auto *next_compute_op = FindNextComputationOpHandle(var_handle); - if (next_compute_op != nullptr) { - if (compute_ref_cnt_map.count(next_compute_op)) { - compute_ref_cnt_map[next_compute_op]->AddVar(var_name); - VLOG(5) << "Add reference count of " << var_name << " to Operator " - << next_compute_op->Name(); - } else { - // Create new reference_count_op_handle - ir::Node *ref_cnt_node = graph->CreateEmptyNode( - "reference_count", ir::Node::Type::kOperation); - auto *ref_cnt_handle = new ReferenceCountOpHandle( - ref_cnt_node, next_compute_op->GetScope(), place, {var_name}, - gcs[place.device].get(), cur_ref_cnts[place.device].get()); - AddDependencyBetween(next_compute_op, ref_cnt_handle, graph.get()); - compute_ref_cnt_map[next_compute_op] = ref_cnt_handle; - } - } + if (ok) { + auto &var_name = name_var_pair.first; + PADDLE_ENFORCE(!result.empty(), "Last living ops of %s cannot be empty", + var_name); + ref_cnts[i].emplace(var_name, result.size()); + last_live_ops_of_vars[i].emplace(var_name, std::move(result)); } } - }; - - auto all_ops = ir::FilterByNodeWrapper(*graph); - for (auto &op : all_ops) { - auto in_var_names = get_ref_cnts_from_compute_op(op, op->Inputs()); - auto out_var_names = get_ref_cnts_from_compute_op(op, op->Outputs()); - if (in_var_names.empty() && out_var_names.empty()) continue; - in_var_names.insert(in_var_names.end(), out_var_names.begin(), - out_var_names.end()); - auto *compute_op = dynamic_cast(op); - auto place = boost::get(compute_op->GetPlace()); - ir::Node *ref_cnt_node = - graph->CreateEmptyNode("reference_count", ir::Node::Type::kOperation); - auto *ref_cnt_handle = new ReferenceCountOpHandle( - ref_cnt_node, compute_op->GetScope(), place, in_var_names, - gcs[place.device].get(), cur_ref_cnts[place.device].get()); - AddDependencyBetween(compute_op, ref_cnt_handle, graph.get()); - compute_ref_cnt_map[compute_op] = ref_cnt_handle; - } - - for (auto &op : all_ops) { - update_ref_cnts_from_non_compute_op(op, op->Inputs()); - update_ref_cnts_from_non_compute_op(op, op->Outputs()); - } - - std::vector new_all_ops; - new_all_ops.reserve(compute_ref_cnt_map.size() + all_ops.size()); - for (auto &op : all_ops) { - new_all_ops.emplace_back(std::move(op)); - auto it = compute_ref_cnt_map.find(new_all_ops.back()); - if (it != compute_ref_cnt_map.end()) { - // Add LeafNode to ReferenceCountOpHandle - auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); - graph->Get(kGraphDepVars).emplace(dummy_leaf); - it->second->AddOutput(dummy_leaf); - new_all_ops.emplace_back(std::move(it->second)); - } } - all_ops.swap(new_all_ops); return graph; } @@ -205,5 +258,4 @@ std::unique_ptr ReferenceCountPass::ApplyImpl( REGISTER_PASS(reference_count_pass, paddle::framework::details::ReferenceCountPass) .RequirePassAttr(paddle::framework::details::kGlobalReferenceCount) - .RequirePassAttr(paddle::framework::details::kCurReferenceCount) - .RequirePassAttr(paddle::framework::details::kGarbageCollector); + .RequirePassAttr(paddle::framework::details::kLastLiveOpsOfVars); diff --git a/paddle/fluid/framework/details/reference_count_pass.h b/paddle/fluid/framework/details/reference_count_pass.h index 7081280b0600b9c1985987d02d679c298ad4b8bd..bcbef027354ef5a5fcc7da28103a9565982c7631 100644 --- a/paddle/fluid/framework/details/reference_count_pass.h +++ b/paddle/fluid/framework/details/reference_count_pass.h @@ -14,7 +14,6 @@ #pragma once -#include "paddle/fluid/framework/details/reference_count_op_handle.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/pass.h" @@ -22,10 +21,6 @@ namespace paddle { namespace framework { namespace details { -constexpr char kGlobalReferenceCount[] = "reference_count"; -constexpr char kCurReferenceCount[] = "current_reference_count"; -constexpr char kGarbageCollector[] = "garbage_collector"; - class ReferenceCountPass : public ir::Pass { protected: std::unique_ptr ApplyImpl( diff --git a/paddle/fluid/framework/details/reference_count_pass_helper.cc b/paddle/fluid/framework/details/reference_count_pass_helper.cc new file mode 100644 index 0000000000000000000000000000000000000000..89bd08c2d041d795205b29bb29aba311d1dbd932 --- /dev/null +++ b/paddle/fluid/framework/details/reference_count_pass_helper.cc @@ -0,0 +1,21 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/reference_count_pass_helper.h" + +namespace paddle { +namespace framework { +namespace details {} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/reference_count_pass_helper.h b/paddle/fluid/framework/details/reference_count_pass_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..1c083dbf001b08e40a54cc89b21c3dea1f18f16a --- /dev/null +++ b/paddle/fluid/framework/details/reference_count_pass_helper.h @@ -0,0 +1,51 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/garbage_collector.h" + +namespace paddle { +namespace framework { +namespace details { + +class ComputationOpHandle; + +using ReferenceCountMap = std::unordered_map; + +using AtomicReferenceCountMap = + std::unordered_map>; + +using GarbageCollectorMap = + std::map>; + +const char kGlobalReferenceCount[] = "global_reference_count"; +const char kRuntimeReferenceCount[] = "runtime_reference_count"; +const char kGarbageCollector[] = "garbage_collector"; +const char kAllPlaces[] = "all_places"; + +using LastLiveOpsOfVars = + std::unordered_map>; +const char kLastLiveOpsOfVars[] = "last_live_ops_of_var"; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 499246a9856bb3ba67a155c6f00c3ad06af50edf..57f6fc66c57e2a53d9cf30d7761626a50bc379ea 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -18,9 +18,6 @@ #include #include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/platform/profiler.h" -#ifdef PADDLE_WITH_CUDA -#include "paddle/fluid/framework/details/reference_count_op_handle.h" -#endif namespace paddle { namespace framework { @@ -69,27 +66,12 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr); drop_scope_counter_ += 1; -#ifdef PADDLE_WITH_CUDA - const std::string gc_name = "garbage_collector"; - DeviceGarbageCollectorMap *gc = - Graph().Has(gc_name) ? &(Graph().Get(gc_name)) - : nullptr; -#endif - if (!fetch_tensors.empty() || drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) { drop_scope_counter_ = 0; // Wait All computational streams for (auto p : places_) { platform::DeviceContextPool::Instance().Get(p)->Wait(); -#ifdef PADDLE_WITH_CUDA - if (gc != nullptr && platform::is_gpu_place(p)) { - auto gpu_place = boost::get(p); - auto &gc_at_place = gc->at(gpu_place.device); - gc_at_place->Wait(); - gc_at_place->Reset(); - } -#endif } for (auto &scope : local_scopes_) { auto &local_scope = diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index e97cf44c75cfdc2e7df22aa870916866b18b3b5a..0c4bd336c5b36eff7d93e4099c96cd3c5990ac17 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/executor.h" +#include #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/lod_rank_table.h" @@ -41,11 +42,43 @@ namespace { int kProgramId = -1; } // namespace +static std::unordered_map GetNonPersistableReferenceCounts( + const BlockDesc& block, const std::vector& skip_var_list) { + std::unordered_map ref_cnts; + std::unordered_set skip_vars(skip_var_list.begin(), + skip_var_list.end()); + + auto update_ref_cnts = [&](OpDesc* op_desc, const VariableNameMap& name_map) { + for (auto& name_pair : name_map) { + for (auto& name : name_pair.second) { + if (skip_vars.count(name)) continue; + auto* var_desc = block.FindVar(name); + if (var_desc == nullptr || var_desc->Persistable()) continue; + auto type = var_desc->Proto()->type().type(); + if (type != proto::VarType::LOD_TENSOR && + type != proto::VarType::SELECTED_ROWS && + type != proto::VarType::LOD_TENSOR_ARRAY) { + continue; + } + ++ref_cnts[name]; + } + } + }; + + for (auto op_desc : block.AllOps()) { + update_ref_cnts(op_desc, op_desc->Inputs()); + update_ref_cnts(op_desc, op_desc->Outputs()); + } + return ref_cnts; +} + ExecutorPrepareContext::ExecutorPrepareContext( - const framework::ProgramDesc& prog, size_t block_id) + const framework::ProgramDesc& prog, size_t block_id, + const std::vector& skip_ref_cnt_vars) : prog_(prog), block_id_(block_id) { if (GetEagerDeletionThreshold() >= 0) { - ref_cnts_ = GetNonPersistableReferenceCount(prog_, block_id_); + global_ref_cnts_ = GetNonPersistableReferenceCounts(prog.Block(block_id), + skip_ref_cnt_vars); } } @@ -53,28 +86,40 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { VLOG(5) << "destroy ExecutorPrepareContext"; } -template -static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op, - GarbageCollector* gc, - RefCntMap* ref_cnts) { - std::unordered_set erase_tensors; +static void DeleteUnusedTensors( + const Scope& scope, const OperatorBase* op, GarbageCollector* gc, + std::unordered_map* ref_cnts) { + std::deque> garbages; auto handler = [&](const VariableNameMap& name_map) { for (auto& name_pair : name_map) { for (auto& name : name_pair.second) { auto it = ref_cnts->find(name); if (it == ref_cnts->end()) continue; - if ((it->second)-- == 1) { - auto* var = scope.FindVar(name); - if (var != nullptr) { - VLOG(10) << "Erase tensor \'" << name << "\'"; - if (var->IsType()) { - erase_tensors.insert(var->GetMutable()); - } else if (var->IsType()) { - erase_tensors.insert( - var->GetMutable()->mutable_value()); - } + if (--(it->second) != 0) { + continue; + } + auto* var = scope.FindVar(name); + if (var != nullptr) { + continue; + } + + VLOG(2) << "Erase variable " << name; + if (var->IsType()) { + garbages.emplace_back( + var->GetMutable()->MoveMemoryHolder()); + } else if (var->IsType()) { + garbages.emplace_back(var->GetMutable() + ->mutable_value() + ->MoveMemoryHolder()); + } else if (var->IsType()) { + auto* lod_tensor_arr = var->GetMutable(); + for (auto& t : *lod_tensor_arr) { + garbages.emplace_back(t.MoveMemoryHolder()); } + } else { + PADDLE_THROW("Type %s of %s is not supported eager deletion", + var->Type().name(), name); } } } @@ -83,8 +128,8 @@ static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op, handler(op->Inputs()); handler(op->Outputs()); - if (!erase_tensors.empty()) { - gc->Add(erase_tensors); + if (!garbages.empty()) { + gc->Add(std::move(garbages)); } } @@ -325,9 +370,10 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } std::unique_ptr Executor::Prepare( - const ProgramDesc& program, int block_id) { + const ProgramDesc& program, int block_id, + const std::vector& skip_ref_cnt_vars) { std::unique_ptr ctx( - new ExecutorPrepareContext(program, block_id)); + new ExecutorPrepareContext(program, block_id, skip_ref_cnt_vars)); PADDLE_ENFORCE_LT(static_cast(block_id), program.Size()); auto& block = program.Block(block_id); for (auto& op_desc : block.AllOps()) { @@ -338,16 +384,28 @@ std::unique_ptr Executor::Prepare( } std::vector> Executor::Prepare( - const ProgramDesc& program, const std::vector& block_ids) { + const ProgramDesc& program, const std::vector& block_ids, + const std::vector>& skip_ref_cnt_vars) { + PADDLE_ENFORCE( + skip_ref_cnt_vars.empty() || skip_ref_cnt_vars.size() == block_ids.size(), + "skip_ref_cnt_vars should be either empty or equals to block number %d", + block_ids.size()); std::vector> result; + size_t idx = 0; for (auto& bid : block_ids) { - auto* ctx = new ExecutorPrepareContext(program, bid); + ExecutorPrepareContext* ctx; + if (skip_ref_cnt_vars.empty()) { + ctx = new ExecutorPrepareContext(program, bid); + } else { + ctx = new ExecutorPrepareContext(program, bid, skip_ref_cnt_vars[idx]); + } PADDLE_ENFORCE_LT(static_cast(bid), program.Size()); auto& block = program.Block(bid); for (auto& op_desc : block.AllOps()) { ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc)); } result.push_back(std::shared_ptr(ctx)); + ++idx; } return result; } @@ -365,22 +423,23 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, } int64_t max_memory_size = GetEagerDeletionThreshold(); - std::unique_ptr> gc; - // WhileOp would set keep_kids to true, - // because WhileGradOp needs the scopes created in WhileOp. - // Perhaps, we should not perform eager deletion in WhileOp - // The scopes and variables created by WhileOp would be deleted - // in WhileGradOp. + std::unique_ptr gc; + // skip while_op and while_grad_op temporarily if (max_memory_size >= 0 && !keep_kids) { ctx->ResetReferenceCount(); #ifdef PADDLE_WITH_CUDA if (platform::is_gpu_place(place_)) { - gc.reset(new DefaultStreamGarbageCollector( - boost::get(place_), max_memory_size)); - } else { + if (IsFastEagerDeletionModeEnabled()) { + gc.reset(new UnsafeFastGPUGarbageCollector( + boost::get(place_), max_memory_size)); + } else { + gc.reset(new DefaultStreamGarbageCollector( + boost::get(place_), max_memory_size)); + } + } else if (platform::is_cpu_place(place_)) { #endif - gc.reset(new CPUGarbageCollector( - boost::get(place_), max_memory_size)); + gc.reset(new CPUGarbageCollector(boost::get(place_), + max_memory_size)); #ifdef PADDLE_WITH_CUDA } #endif @@ -389,17 +448,13 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, for (auto& op : ctx->ops_) { op->Run(*local_scope, place_); - if (gc != nullptr) { + if (gc) { DeleteUnusedTensors(*local_scope, op.get(), gc.get(), - &(ctx->cur_ref_cnts_)); + &(ctx->runtime_ref_cnts_)); } } - if (gc != nullptr) { - gc->Wait(); - } else { - platform::DeviceContextPool::Instance().Get(place_)->Wait(); - } + platform::DeviceContextPool::Instance().Get(place_)->Wait(); if (local_scope != scope) { scope->DeleteScope(local_scope); diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 2d47903ffbd8d821b7c31386b225fe5e65ca2720..5a040ac641588ad4d89d1f6e4c0d6c296eff38eb 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -27,52 +27,21 @@ limitations under the License. */ namespace paddle { namespace framework { -template -std::unordered_map GetNonPersistableReferenceCount( - const ProgramDesc& prog, size_t block_id) { - auto& block = prog.Block(block_id); - std::unordered_map ref_cnts; - - auto update_ref_cnts = [&](OpDesc* op_desc, const VariableNameMap& name_map) { - for (auto& name_pair : name_map) { - for (auto& name : name_pair.second) { - auto* var_desc = block.FindVar(name); - if (var_desc == nullptr || var_desc->Persistable()) continue; - auto type = var_desc->Proto()->type().type(); - if (type != proto::VarType::LOD_TENSOR && - type != proto::VarType::SELECTED_ROWS) { - continue; - } - - auto it = ref_cnts.find(name); - if (it != ref_cnts.end()) { - ++it->second; - } else { - ref_cnts[name] = 1; - } - } - } - }; - - for (auto op_desc : block.AllOps()) { - update_ref_cnts(op_desc, op_desc->Inputs()); - update_ref_cnts(op_desc, op_desc->Outputs()); - } - return ref_cnts; -} - struct ExecutorPrepareContext { - ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id); + ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id, + const std::vector& skip_ref_cnt_vars = + std::vector()); + ~ExecutorPrepareContext(); - void ResetReferenceCount() { cur_ref_cnts_ = ref_cnts_; } + void ResetReferenceCount() { runtime_ref_cnts_ = global_ref_cnts_; } const framework::ProgramDesc& prog_; size_t block_id_; std::vector> ops_; - std::unordered_map ref_cnts_; - std::unordered_map cur_ref_cnts_; + std::unordered_map global_ref_cnts_; + std::unordered_map runtime_ref_cnts_; }; class Executor { @@ -108,10 +77,14 @@ class Executor { const std::string& fetch_holder_name = "fetch"); static std::unique_ptr Prepare( - const ProgramDesc& program, int block_id); + const ProgramDesc& program, int block_id, + const std::vector& skip_ref_cnt_vars = + std::vector()); static std::vector> Prepare( - const ProgramDesc& program, const std::vector& block_ids); + const ProgramDesc& program, const std::vector& block_ids, + const std::vector>& skip_ref_cnt_vars = + std::vector>()); void CreateVariables(const ProgramDesc& pdesc, Scope* scope, int block_id); diff --git a/paddle/fluid/framework/garbage_collector.cc b/paddle/fluid/framework/garbage_collector.cc new file mode 100644 index 0000000000000000000000000000000000000000..54d9d0dc018b08decb2ff8965659bab98e81f3ab --- /dev/null +++ b/paddle/fluid/framework/garbage_collector.cc @@ -0,0 +1,89 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/cuda_device_guard.h" +#endif +#include "paddle/fluid/framework/garbage_collector.h" + +namespace paddle { +namespace framework { + +GarbageCollector::GarbageCollector(const platform::Place &place, + size_t max_memory_size) + : max_memory_size_((std::max)(max_memory_size, static_cast(1))) { + garbages_.reset(new GarbageQueue()); + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place); +} + +CPUGarbageCollector::CPUGarbageCollector(const platform::CPUPlace &place, + size_t max_memory_size) + : GarbageCollector(place, max_memory_size) {} + +void CPUGarbageCollector::ClearCallback(const std::function &callback) { + callback(); +} + +#ifdef PADDLE_WITH_CUDA +UnsafeFastGPUGarbageCollector::UnsafeFastGPUGarbageCollector( + const platform::CUDAPlace &place, size_t max_memory_size) + : GarbageCollector(place, max_memory_size) {} + +void UnsafeFastGPUGarbageCollector::ClearCallback( + const std::function &callback) { + callback(); +} + +DefaultStreamGarbageCollector::DefaultStreamGarbageCollector( + const platform::CUDAPlace &place, size_t max_memory_size) + : GarbageCollector(place, max_memory_size) {} + +void DefaultStreamGarbageCollector::Wait() const { + static_cast(this->dev_ctx_) + ->WaitStreamCallback(); +} + +void DefaultStreamGarbageCollector::ClearCallback( + const std::function &callback) { + static_cast(this->dev_ctx_) + ->AddStreamCallback(callback); +} + +StreamGarbageCollector::StreamGarbageCollector(const platform::CUDAPlace &place, + size_t max_memory_size) + : GarbageCollector(place, max_memory_size) { + platform::CUDADeviceGuard guard(place.device); + PADDLE_ENFORCE(cudaStreamCreate(&stream_)); + callback_manager_.reset(new platform::StreamCallbackManager(stream_)); +} + +StreamGarbageCollector::~StreamGarbageCollector() { + auto place = boost::get(this->dev_ctx_->GetPlace()); + platform::CUDADeviceGuard guard(place.device); + PADDLE_ENFORCE(cudaStreamSynchronize(stream_)); + PADDLE_ENFORCE(cudaStreamDestroy(stream_)); +} + +cudaStream_t StreamGarbageCollector::stream() const { return stream_; } + +void StreamGarbageCollector::Wait() const { callback_manager_->Wait(); } + +void StreamGarbageCollector::ClearCallback( + const std::function &callback) { + callback_manager_->AddCallback(callback); +} +#endif +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/garbage_collector.h b/paddle/fluid/framework/garbage_collector.h index 818b3334ea4171fd7a9cbaa896ee1672e8ecca51..2768671029c06562aa0d2e5eea3d3ff61d900ab5 100644 --- a/paddle/fluid/framework/garbage_collector.h +++ b/paddle/fluid/framework/garbage_collector.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -24,134 +23,74 @@ namespace paddle { namespace framework { -// T should have memory_size() and clear() method -template class GarbageCollector { public: - GarbageCollector(const platform::Place &place, size_t max_memory_size) - : max_memory_size_((std::max)(max_memory_size, static_cast(1))) { - garbages_.reset(new std::deque()); - dev_ctx_ = platform::DeviceContextPool::Instance().Get(place); - } + using GarbageQueue = std::deque>; - virtual ~GarbageCollector() {} + GarbageCollector(const platform::Place &place, size_t max_memory_size); - void Reset() { - std::lock_guard guard(mutex_); - garbages_.reset(new std::deque()); - cur_memory_size_ = 0; - } + virtual ~GarbageCollector() = default; + + virtual void Wait() const {} template - void Add(const Container &objs) { - Add(objs, []() {}); - } + void Add(Container &&objs); template - void Add(const Container &objs, Callback &&callback) { - std::shared_ptr> clear_deque; - { - std::lock_guard guard(mutex_); - for (auto *obj : objs) { - garbages_->push_back(obj); - cur_memory_size_ += obj->memory_size(); - } - if (cur_memory_size_ >= max_memory_size_) { - cur_memory_size_ = 0; - clear_deque = garbages_; - garbages_.reset(new std::deque()); - } - } - - if (clear_deque != nullptr) { - callback(); - ClearCallback([=]() { - for (auto *obj : *clear_deque) obj->clear(); - }); - } - } - - virtual void Wait() const {} + void Add(Container &&objs, Callback &&callback); protected: virtual void ClearCallback(const std::function &callback) = 0; platform::DeviceContext *dev_ctx_; - std::shared_ptr> garbages_; + std::unique_ptr garbages_; mutable std::mutex mutex_; const size_t max_memory_size_; - size_t cur_memory_size_ = 0; + size_t cur_memory_size_{0}; }; -template -class CPUGarbageCollector : public GarbageCollector { +class CPUGarbageCollector : public GarbageCollector { public: - CPUGarbageCollector(const platform::CPUPlace &place, size_t max_memory_size) - : GarbageCollector(place, max_memory_size) {} + CPUGarbageCollector(const platform::CPUPlace &place, size_t max_memory_size); protected: - void ClearCallback(const std::function &callback) override { - callback(); - } + void ClearCallback(const std::function &callback) override; }; #ifdef PADDLE_WITH_CUDA -template -class DefaultStreamGarbageCollector : public GarbageCollector { +class UnsafeFastGPUGarbageCollector : public GarbageCollector { public: - DefaultStreamGarbageCollector(const platform::CUDAPlace &place, - size_t max_memory_size) - : GarbageCollector(place, max_memory_size) {} + UnsafeFastGPUGarbageCollector(const platform::CUDAPlace &place, + size_t max_memory_size); - cudaStream_t stream() const { - return static_cast(this->dev_ctx_) - ->stream(); - } + protected: + void ClearCallback(const std::function &callback) override; +}; - void Wait() const override { - this->dev_ctx_->Wait(); - static_cast(this->dev_ctx_) - ->WaitStreamCallback(); - } +class DefaultStreamGarbageCollector : public GarbageCollector { + public: + DefaultStreamGarbageCollector(const platform::CUDAPlace &place, + size_t max_memory_size); + + void Wait() const override; protected: - void ClearCallback(const std::function &callback) override { - static_cast(this->dev_ctx_) - ->AddStreamCallback(callback); - } + void ClearCallback(const std::function &callback) override; }; -template -class StreamGarbageCollector : public GarbageCollector { +class StreamGarbageCollector : public GarbageCollector { public: StreamGarbageCollector(const platform::CUDAPlace &place, - size_t max_memory_size) - : GarbageCollector(place, max_memory_size) { - PADDLE_ENFORCE(cudaSetDevice(place.device)); - PADDLE_ENFORCE(cudaStreamCreate(&stream_)); - callback_manager_.reset(new platform::StreamCallbackManager(stream_)); - } + size_t max_memory_size); - ~StreamGarbageCollector() { - auto place = boost::get(this->dev_ctx_->GetPlace()); - PADDLE_ENFORCE(cudaSetDevice(place.device)); - PADDLE_ENFORCE(cudaStreamSynchronize(stream_)); - PADDLE_ENFORCE(cudaStreamDestroy(stream_)); - } + ~StreamGarbageCollector(); - void Wait() const override { - PADDLE_ENFORCE(cudaStreamSynchronize(stream_)); - std::lock_guard guard(this->mutex_); - callback_manager_->Wait(); - } + void Wait() const override; - cudaStream_t stream() const { return stream_; } + cudaStream_t stream() const; protected: - void ClearCallback(const std::function &callback) override { - std::lock_guard guard(this->mutex_); - callback_manager_->AddCallback(callback); - } + void ClearCallback(const std::function &callback) override; private: cudaStream_t stream_; @@ -159,5 +98,33 @@ class StreamGarbageCollector : public GarbageCollector { }; #endif +template +void GarbageCollector::Add(Container &&objs) { + Add(std::forward(objs), []() {}); +} + +template +void GarbageCollector::Add(Container &&objs, Callback &&callback) { + GarbageQueue *garbage_queue = nullptr; + { + std::lock_guard guard(mutex_); + for (auto &obj : objs) { + if (!obj) continue; + cur_memory_size_ += obj->size(); + garbages_->push_back(std::move(obj)); + } + if (cur_memory_size_ >= max_memory_size_) { + cur_memory_size_ = 0; + garbage_queue = garbages_.release(); + garbages_.reset(new GarbageQueue()); + } + } + + if (garbage_queue) { + callback(); + ClearCallback([garbage_queue]() { delete garbage_queue; }); + } +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index bb2d953afbd64dc33d735f483a8eafd9ce951530..47fcf96a3f92b1f915e5254fff36feb8b2870730 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -73,14 +73,21 @@ class Graph { } bool Has(const std::string &attr_name) const { - return attrs_.find(attr_name) != attrs_.end(); + return attrs_.count(attr_name) > 0; } template AttrType &Get(const std::string &attr_name) const { PADDLE_ENFORCE(Has(attr_name), "%s attr not registered for graph.", attr_name); - return *boost::any_cast(attrs_.at(attr_name)); + try { + return *boost::any_cast(attrs_.at(attr_name)); + } catch (boost::bad_any_cast &) { + PADDLE_THROW( + "Invalid attribute type of %s error, expected: %s, actual: %s", + attr_name, typeid(AttrType *).name(), + attrs_.at(attr_name).type().name()); + } } template diff --git a/paddle/fluid/framework/ir/pass.h b/paddle/fluid/framework/ir/pass.h index a3559247db6703d486ed01ce9f2058e671443096..27746ff1453b1b336da8c31497c066c338843b68 100644 --- a/paddle/fluid/framework/ir/pass.h +++ b/paddle/fluid/framework/ir/pass.h @@ -51,11 +51,18 @@ class Pass { AttrType &Get(const std::string &attr_name) const { PADDLE_ENFORCE(attrs_.find(attr_name) != attrs_.end(), "%s attr not registered for pass.", attr_name); - return *boost::any_cast(attrs_.at(attr_name)); + try { + return *boost::any_cast(attrs_.at(attr_name)); + } catch (boost::bad_any_cast &) { + PADDLE_THROW( + "Invalid attribute type of %s error, expected: %s, actual: %s", + attr_name, typeid(AttrType *).name(), + attrs_.at(attr_name).type().name()); + } } bool Has(const std::string &attr_name) const { - return attrs_.find(attr_name) != attrs_.end(); + return attrs_.count(attr_name) > 0; } void Erase(const std::string &attr_name) { diff --git a/paddle/fluid/framework/op_registry.h b/paddle/fluid/framework/op_registry.h index 36673e48c2047bca54f604b082dfec123f1e2c82..6d39bb3c524b4725dfebd6ef07594b0b45c65463 100644 --- a/paddle/fluid/framework/op_registry.h +++ b/paddle/fluid/framework/op_registry.h @@ -319,7 +319,7 @@ struct OpKernelRegistrarFunctorExGet().value()); } if (t != nullptr) { - PADDLE_ENFORCE(t->IsInitialized(), - "Input %s(%s) does not exist in Operator %s", - input.first, ipt_name, DebugString()); + PADDLE_ENFORCE(t->IsInitialized(), "Input %s is not initialized: %s", + ipt_name, DebugString()); int tmp = static_cast(ToDataType(t->type())); PADDLE_ENFORCE( tmp == data_type || data_type == -1, diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 28a4b14b27bda65c0ff8e3e73df1fab62439af2f..eb4baa06b5284512eab128e57f893bad43afda97 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -26,6 +26,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/details/reference_count_pass_helper.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/platform/profiler.h" @@ -72,6 +73,26 @@ class ParallelExecutorPrivate { } } } + + std::unique_ptr PrepareGCAndRefCnts( + std::unique_ptr graph, size_t max_memory_size); + + inline bool HasGarbageCollectors() const { return !gcs_.empty(); } + + void ResetRuntimeReferenceCount(const std::vector &fetch_tensors, + const std::string &fetched_var_name) { + for (size_t i = 0; i < runtime_ref_cnts_.size(); ++i) { + for (auto &pair : global_ref_cnts_[i]) { + runtime_ref_cnts_[i][pair.first] = pair.second; + } + + for (auto &fetch_name : fetch_tensors) { + runtime_ref_cnts_[i].erase(fetch_name); + } + runtime_ref_cnts_[i].erase(fetched_var_name); + } + } + std::vector places_; std::vector local_scopes_; Scope *global_scope_; // not owned @@ -83,8 +104,76 @@ class ParallelExecutorPrivate { bool own_local_scope_; bool use_cuda_; bool use_all_reduce_; + + // global_ref_cnts_ is only initialized when ParallelExecutor constructs, and + // then keeps unchanged + // Before each iteration, runtime_ref_cnts_ is reset to global_ref_cnts_ + std::vector global_ref_cnts_; + std::vector runtime_ref_cnts_; + details::GarbageCollectorMap gcs_; }; +std::unique_ptr ParallelExecutorPrivate::PrepareGCAndRefCnts( + std::unique_ptr graph, size_t max_memory_size) { + for (size_t i = 0; i < places_.size(); ++i) { + auto &place = places_[i]; + if (gcs_.count(place) > 0) { + continue; + } + std::unique_ptr gc; +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place)) { + if (IsFastEagerDeletionModeEnabled()) { + gc.reset(new UnsafeFastGPUGarbageCollector( + boost::get(place), max_memory_size)); + } else { + gc.reset(new StreamGarbageCollector( + boost::get(place), max_memory_size)); + } + VLOG(10) << "Created " << i << "-th GarbageCollector at " << place; + } else { +#endif + if (platform::is_cpu_place(place)) { + gc.reset(new CPUGarbageCollector(boost::get(place), + max_memory_size)); + VLOG(10) << "Created GarbageCollector at " << place; + } else { + PADDLE_THROW("Unsupported place for garbage collection"); + } +#ifdef PADDLE_WITH_CUDA + } +#endif + + gcs_.emplace(place, std::move(gc)); + } + + if (!gcs_.empty()) { + std::vector last_live_ops_of_vars; + + auto ref_cnt_pass = + ir::PassRegistry::Instance().Get("reference_count_pass"); + ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, + &global_ref_cnts_); + ref_cnt_pass->SetNotOwned(details::kLastLiveOpsOfVars, + &last_live_ops_of_vars); + graph = ref_cnt_pass->Apply(std::move(graph)); + VLOG(10) << "ReferenceCountPass Applied"; + + auto eager_deletion_pass = + ir::PassRegistry::Instance().Get("eager_deletion_pass"); + eager_deletion_pass->SetNotOwned(details::kRuntimeReferenceCount, + &runtime_ref_cnts_); + eager_deletion_pass->SetNotOwned(details::kGarbageCollector, &gcs_); + eager_deletion_pass->SetNotOwned(details::kLastLiveOpsOfVars, + &last_live_ops_of_vars); + eager_deletion_pass->SetNotOwned(details::kAllPlaces, &places_); + graph = eager_deletion_pass->Apply(std::move(graph)); + VLOG(10) << "EagerDeletionPass Applied"; + } + + return graph; +} + std::vector &ParallelExecutor::GetLocalScopes() { return member_->local_scopes_; } @@ -151,36 +240,18 @@ ParallelExecutor::ParallelExecutor( std::unique_ptr graph = build_strategy.Apply( main_program, member_->places_, loss_var_name, params, member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); - - auto max_memory_size = GetEagerDeletionThreshold(); - if (max_memory_size >= 0) { - for (auto &place : member_->places_) { - if (!platform::is_gpu_place(place)) continue; - auto gpu_place = boost::get(place); - if (gcs_[gpu_place.device] == nullptr) { - ref_cnts_[gpu_place.device].reset(new details::ReferenceCountMap()); - cur_ref_cnts_[gpu_place.device].reset( - new details::AtomicReferenceCountMap()); - gcs_[gpu_place.device].reset( - new StreamGarbageCollector(gpu_place, max_memory_size)); - } - } - if (!gcs_.empty()) { - auto ref_cnt_pass = - ir::PassRegistry::Instance().Get("reference_count_pass"); - ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_); - ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_); - ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_); - graph = ref_cnt_pass->Apply(std::move(graph)); - graph->SetNotOwned("garbage_collector", &gcs_); - } - } #else std::unique_ptr graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, params, member_->local_scopes_, member_->use_cuda_); #endif + auto max_memory_size = GetEagerDeletionThreshold(); + if (max_memory_size >= 0) { + graph = member_->PrepareGCAndRefCnts(std::move(graph), + static_cast(max_memory_size)); + } + // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; @@ -300,18 +371,9 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, #endif platform::RecordBlock b(0); -#ifdef PADDLE_WITH_CUDA - if (!gcs_.empty()) { - ResetReferenceCount(); - for (auto &pair : cur_ref_cnts_) { - auto &name_map = *(pair.second); - for (auto &fetch_name : fetch_tensors) { - name_map.erase(fetch_name); - } - name_map.erase(fetched_var_name); - } + if (member_->HasGarbageCollectors()) { + member_->ResetRuntimeReferenceCount(fetch_tensors, fetched_var_name); } -#endif auto fetch_data = member_->executor_->Run(fetch_tensors); *member_->global_scope_->Var(fetched_var_name)->GetMutable() = fetch_data; @@ -355,13 +417,11 @@ ParallelExecutor::~ParallelExecutor() { for (auto &p : member_->places_) { platform::DeviceContextPool::Instance().Get(p)->Wait(); } - // member_ must be destructed before gcs_ since the destructor of - // ReferenceCountOpHandle use raw pointers of gcs_ inside. - member_.reset(); + delete member_; } } // namespace framework } // namespace paddle -#ifdef PADDLE_WITH_CUDA + USE_PASS(reference_count_pass); -#endif +USE_PASS(eager_deletion_pass); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ef09b98b2aa91a9d729b94d15dbb676dde4092b6..1fc17a0d64d50eb70ce66cacd4752a5b96d5e894 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,7 +14,6 @@ limitations under the License. */ #pragma once -#include #include #include #include @@ -29,10 +28,6 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" -#ifdef PADDLE_WITH_CUDA -#include "paddle/fluid/framework/details/reference_count_pass.h" -#endif - namespace paddle { namespace framework { @@ -75,24 +70,7 @@ class ParallelExecutor { private: void BCastParamsToDevices(const std::unordered_set &vars) const; - std::unique_ptr member_; - -#ifdef PADDLE_WITH_CUDA - // ref_cnts_ is only initialized when ParallelExecutor constructs, and then - // keeps unchanged - // Before each iteration, cur_ref_cnts_ is reset to ref_cnts_ - details::DeviceReferenceCountMap ref_cnts_; - details::AtomicDeviceReferenceCountMap cur_ref_cnts_; - details::DeviceGarbageCollectorMap gcs_; - - void ResetReferenceCount() { - for (auto &pair1 : ref_cnts_) { - for (auto &pair2 : *(pair1.second)) { - (*(cur_ref_cnts_[pair1.first]))[pair2.first] = pair2.second; - } - } - } -#endif + ParallelExecutorPrivate *member_; }; } // namespace framework diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 0d261dd7ccc323abddd2c3ef13f1874661a8ca75..6fa5e99f9f3a7e871f1a742a30803853988ea6eb 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -38,6 +38,10 @@ DEFINE_double( "Memory size threshold (GB) when the garbage collector clear tensors." "Disabled when this value is less than 0"); +DEFINE_bool(fast_eager_deletion_mode, false, + "Fast eager deletion mode. If enabled, memory would release " + "immediately without waiting GPU kernel ends."); + // When in inference scenario, the scopes will not be written by two threads in // a mean time, but a scope may be read by multiple threads concurrently, and // the mutex will cause serious performance issue. @@ -58,6 +62,8 @@ int64_t GetEagerDeletionThreshold() { (static_cast(1) << 30)); } +bool IsFastEagerDeletionModeEnabled() { return FLAGS_fast_eager_deletion_mode; } + Scope::~Scope() { DropKids(); } Scope& Scope::NewScope() const { diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index 1901ffbe57e0d85193c3a218f06eba06a0f287a5..aded1f771cedbf2442ad36d7fab3e6e6caffdc24 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -27,6 +27,7 @@ namespace paddle { namespace framework { int64_t GetEagerDeletionThreshold(); +bool IsFastEagerDeletionModeEnabled(); class Scope; diff --git a/paddle/fluid/framework/tensor.h b/paddle/fluid/framework/tensor.h index 71e8badd4b6b08e7d380fd45d93a33176172081d..153222506af02911b792907d392a42e5499c5e7a 100644 --- a/paddle/fluid/framework/tensor.h +++ b/paddle/fluid/framework/tensor.h @@ -158,6 +158,10 @@ class Tensor { const std::shared_ptr& Holder() const { return holder_; } size_t offset() const { return offset_; } + std::shared_ptr MoveMemoryHolder() { + return std::move(holder_); + } + private: /*! holds the memory block if allocated. */ std::shared_ptr holder_; diff --git a/paddle/fluid/operators/controlflow/while_op.cc b/paddle/fluid/operators/controlflow/while_op.cc index 6c1b2f329a59e1b27caad2996308b33b3a72de1d..5ab0918c486cc56c7d55f24f4952a013044971ee 100644 --- a/paddle/fluid/operators/controlflow/while_op.cc +++ b/paddle/fluid/operators/controlflow/while_op.cc @@ -32,6 +32,20 @@ static constexpr char kStepScopes[] = "StepScopes"; static constexpr char kX[] = "X"; static constexpr char kXGRAD[] = "X@GRAD"; static constexpr char kOutputs[] = "Out"; +static constexpr char kSkipEagerDeletionVars[] = "skip_eager_deletion_vars"; + +namespace { // NOLINT +static std::string GetSkipEagerDeletionVarsDebugString( + const std::vector &vars) { + std::string str = "Skip " + std::to_string(vars.size()) + + " var(s) in eager deletion mode: "; + for (auto &var : vars) { + str.append(var); + str.push_back(' '); + } + return str; +} +} // NOLINT class WhileOp : public framework::OperatorBase { public: @@ -59,7 +73,10 @@ class WhileOp : public framework::OperatorBase { "Condition of while op must in CPU memory."); bool is_test = Attr("is_test"); - auto ctx = executor.Prepare(*program, block->ID()); + auto &skip_vars = Attr>(kSkipEagerDeletionVars); + VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars); + + auto ctx = executor.Prepare(*program, block->ID(), skip_vars); while (cond.data()[0]) { auto ¤t_scope = scope.NewScope(); step_scopes->push_back(¤t_scope); @@ -96,6 +113,10 @@ class WhileOpMaker : public framework::OpProtoAndCheckerMaker { "(bool, default false) Set to true for inference only, false " "for training. Some layers may run faster when this is true.") .SetDefault(false); + AddAttr>(kSkipEagerDeletionVars, + "Vars that would skip eager deletion." + "Users should not set this manually.") + .SetDefault(std::vector()); AddComment(R"DOC( )DOC"); } @@ -119,7 +140,10 @@ class WhileGradOp : public framework::OperatorBase { framework::Executor executor(dev_place); auto *block = Attr(kStepBlock); auto *program = block->Program(); - auto ctx = executor.Prepare(*program, block->ID()); + + auto &skip_vars = Attr>(kSkipEagerDeletionVars); + VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars); + auto ctx = executor.Prepare(*program, block->ID(), skip_vars); auto *step_scopes = scope.FindVar(Input(kStepScopes))->GetMutable(); @@ -341,6 +365,8 @@ class WhileGradOpDescMaker : public framework::SingleGradOpDescMaker { // while operator could be renamed. while_grad->SetAttr("original_output_grad", output_grads_list); + while_grad->SetAttr(kSkipEagerDeletionVars, std::vector()); + return std::unique_ptr(while_grad); } }; diff --git a/paddle/fluid/operators/psroi_pool_op.cc b/paddle/fluid/operators/psroi_pool_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..6978d9c5dc5993e64793f420a63dcca020f47868 --- /dev/null +++ b/paddle/fluid/operators/psroi_pool_op.cc @@ -0,0 +1,173 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/psroi_pool_op.h" + +namespace paddle { +namespace operators { + +using Tensor = framework::Tensor; +using LoDTensor = framework::LoDTensor; + +class PSROIPoolOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("X", + "(Tensor), " + "the input of PSROIPoolOp. " + "The format of input tensor is NCHW. Where N is the batch size, " + "C is the number of input channels, " + "H is the height of the input feature map, and " + "W is the width."); + AddInput("ROIs", + "(LoDTensor), " + "ROIs (Regions of Interest) to pool over. " + "should be a 2-D LoDTensor of shape (num_rois, 4) " + "given as [(x1, y1, x2, y2), ...]. " + "where (x1, y1) is the top left coordinates, and " + "(x2, y2) is the bottom right coordinates. " + "The roi batch index can be calculated from LoD."); + AddOutput("Out", + "(Tensor), " + "the output of PSROIPoolOp is a 4-D Tensor with shape " + "(num_rois, output_channels, pooled_h, pooled_w)."); + AddAttr( + "output_channels", + "(int), " + "the number of channels of the output feature map. " + "For a task of C classes of objects, output_channels should be " + "(C + 1) for classification only."); + AddAttr("spatial_scale", + "(float, default 1.0), " + "Multiplicative spatial scale factor " + "to translate ROI coords from their input scale " + "to the scale used when pooling.") + .SetDefault(1.0); + AddAttr("pooled_height", + "(int, default 1), " + "the pooled output height.") + .SetDefault(1); + AddAttr("pooled_width", + "(int, default 1), " + "the pooled output width.") + .SetDefault(1); + AddComment(R"Doc( +**PSROIPool Operator** + +Position sensitive region of interest pooling (also known as PSROIPooling) is to perform +position-sensitive average pooling on regions of interest specified by input, takes as +input N position-sensitive score maps and a list of num_rois regions of interest. + +PSROIPooling for R-FCN. Please refer to https://arxiv.org/abs/1605.06409 for more details. + )Doc"); + } +}; + +class PSROIPoolOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext* ctx) const override { + PADDLE_ENFORCE(ctx->HasInput("X"), + "Input(X) of PSROIPoolOp should not be null."); + PADDLE_ENFORCE(ctx->HasInput("ROIs"), + "Input(ROIs) of PSROIPoolOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput("Out"), + "Output(Out) of PSROIPoolOp should not be null."); + auto input_dims = ctx->GetInputDim("X"); + auto rois_dims = ctx->GetInputDim("ROIs"); + + PADDLE_ENFORCE(input_dims.size() == 4, + "The format of input tensor is NCHW"); + PADDLE_ENFORCE(rois_dims.size() == 2, + "ROIs should be a 2-D LoDTensor of shape (num_rois, 4) " + "given as [(x1, y1, x2, y2), ...]"); + PADDLE_ENFORCE(rois_dims[1] == 4, + "ROIs should be a 2-D LoDTensor of shape (num_rois, 4) " + "given as [(x1, y1, x2, y2), ...]"); + + int pooled_height = ctx->Attrs().Get("pooled_height"); + int pooled_width = ctx->Attrs().Get("pooled_width"); + int output_channels = ctx->Attrs().Get("output_channels"); + float spatial_scale = ctx->Attrs().Get("spatial_scale"); + + PADDLE_ENFORCE( + input_dims[1] == output_channels * pooled_height * pooled_width, + "the channel of X(%d) should be equal to the product of " + "output_channels(%d), pooled_height(%d) and pooled_width(%d)", + input_dims[1], output_channels, pooled_height, pooled_width); + + PADDLE_ENFORCE_GT(pooled_height, 0, + "The pooled output height must be greater than 0"); + PADDLE_ENFORCE_GT(pooled_width, 0, + "The pooled output width must be greater than 0"); + PADDLE_ENFORCE_GT(output_channels, 1, + "The pooled output channels must greater than 1"); + PADDLE_ENFORCE_GT(spatial_scale, 0.0f, + "The spatial scale must greater than 0."); + + auto out_dims = input_dims; + out_dims[0] = rois_dims[0]; + out_dims[1] = + output_channels; // input_dims[1] / (pooled_height * pooled_width); + out_dims[2] = pooled_height; + out_dims[3] = pooled_width; + ctx->SetOutputDim("Out", out_dims); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("X")->type()), + ctx.device_context()); + } +}; + +class PSROIPoolGradOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext* ctx) const override { + PADDLE_ENFORCE(ctx->HasInput(framework::GradVarName("Out")), + "The gradient of Out should not be null."); + PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("X")), + "The gradient of X should not be null."); + ctx->SetOutputDim(framework::GradVarName("X"), ctx->GetInputDim("X")); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("X")->type()), + ctx.device_context()); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OPERATOR(psroi_pool, ops::PSROIPoolOp, ops::PSROIPoolOpMaker, + paddle::framework::DefaultGradOpDescMaker); +REGISTER_OPERATOR(psroi_pool_grad, ops::PSROIPoolGradOp); +REGISTER_OP_CPU_KERNEL( + psroi_pool, + ops::CPUPSROIPoolOpKernel, + ops::CPUPSROIPoolOpKernel); +REGISTER_OP_CPU_KERNEL( + psroi_pool_grad, + ops::CPUPSROIPoolGradOpKernel, + ops::CPUPSROIPoolGradOpKernel); diff --git a/paddle/fluid/operators/psroi_pool_op.cu b/paddle/fluid/operators/psroi_pool_op.cu new file mode 100644 index 0000000000000000000000000000000000000000..22fec3244fabe5ca466202784c0cce372d0bf6e5 --- /dev/null +++ b/paddle/fluid/operators/psroi_pool_op.cu @@ -0,0 +1,294 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/psroi_pool_op.h" +#include "paddle/fluid/platform/cuda_primitives.h" + +namespace paddle { +namespace operators { + +using Tensor = framework::Tensor; +using LoDTensor = framework::LoDTensor; + +static constexpr int kNumCUDAThreads = 512; +static constexpr int kNumMaximumNumBlocks = 4096; + +static inline int NumBlocks(const int N) { + return std::min((N + kNumCUDAThreads - 1) / kNumCUDAThreads, + kNumMaximumNumBlocks); +} + +template +__global__ void GPUPSROIPoolForward( + const int nthreads, const T* input_data, const T* input_rois, + const float spatial_scale, const int input_channels, const int height, + const int width, const int output_channels, const int pooled_height, + const int pooled_width, const int* rois_batch_id_data, T* output_data) { + int index = blockIdx.x * blockDim.x + threadIdx.x; + int offset = blockDim.x * gridDim.x; + for (size_t i = index; i < nthreads; i += offset) { + // The output is in order (n, c, ph, pw) + int pw = i % pooled_width; + int ph = (i / pooled_width) % pooled_height; + int c = (i / pooled_width / pooled_height) % output_channels; + int n = i / pooled_width / pooled_height / output_channels; + + // set roi_batch_id + int roi_batch_id = rois_batch_id_data[n]; + + // [start, end) interval for spatial sampling + const T* offset_input_rois = input_rois + n * 4; + T roi_start_w = static_cast(round(offset_input_rois[0])) * spatial_scale; + T roi_start_h = static_cast(round(offset_input_rois[1])) * spatial_scale; + T roi_end_w = + static_cast(round(offset_input_rois[2]) + 1.) * spatial_scale; + T roi_end_h = + static_cast(round(offset_input_rois[3]) + 1.) * spatial_scale; + + // Force too small ROIs to be 1x1 + T roi_height = max(roi_end_h - roi_start_h, (T)0.1); // avoid 0 + T roi_width = max(roi_end_w - roi_start_w, (T)0.1); + + // Compute w and h at input feature map + T bin_size_h = roi_height / static_cast(pooled_height); + T bin_size_w = roi_width / static_cast(pooled_width); + + int hstart = floor(bin_size_h * static_cast(ph) + roi_start_h); + int wstart = floor(bin_size_w * static_cast(pw) + roi_start_w); + int hend = ceil(bin_size_h * static_cast(ph + 1) + roi_start_h); + int wend = ceil(bin_size_w * static_cast(pw + 1) + roi_start_w); + + // Add roi offsets and clip to input boundaries + hstart = min(max(hstart, 0), height); + hend = min(max(hend, 0), height); + wstart = min(max(wstart, 0), width); + wend = min(max(wend, 0), width); + bool is_empty = (hend <= hstart) || (wend <= wstart); + + int input_channel = (c * pooled_height + ph) * pooled_width + pw; + const T* offset_input_data = + input_data + + (roi_batch_id * input_channels + input_channel) * height * width; + T outsum = 0; + + for (int ih = hstart; ih < hend; ++ih) { + for (int iw = wstart; iw < wend; ++iw) { + int input_index = ih * width + iw; + outsum += offset_input_data[input_index]; + } + } + + T bin_area = static_cast((hend - hstart) * (wend - wstart)); + output_data[i] = is_empty ? 0. : outsum / bin_area; + } +} + +template +__global__ void GPUPSROIPoolBackward( + const int nthreads, const T* input_rois, const T* output_grad_data, + const float spatial_scale, const int input_channels, const int height, + const int width, const int output_channels, const int pooled_height, + const int pooled_width, const int* rois_batch_id_data, T* input_grad_data) { + int index = blockIdx.x * blockDim.x + threadIdx.x; + int offset = blockDim.x * gridDim.x; + for (int i = index; i < nthreads; i += offset) { + // The output is in order (n, c, ph, pw) + int pw = i % pooled_width; + int ph = (i / pooled_width) % pooled_height; + int c = (i / pooled_width / pooled_height) % output_channels; + int n = i / pooled_width / pooled_height / output_channels; + + // set roi_batch_id + int roi_batch_id = rois_batch_id_data[n]; + int input_channel = (c * pooled_height + ph) * pooled_width + pw; + int input_offset = + (roi_batch_id * input_channels + input_channel) * height * width; + T* offset_input_grad_data = input_grad_data + input_offset; + + // [start, end) interval for spatial sampling + const T* offset_input_rois = input_rois + n * 4; + T roi_start_w = static_cast(round(offset_input_rois[0])) * spatial_scale; + T roi_start_h = static_cast(round(offset_input_rois[1])) * spatial_scale; + T roi_end_w = + static_cast(round(offset_input_rois[2]) + 1.) * spatial_scale; + T roi_end_h = + static_cast(round(offset_input_rois[3]) + 1.) * spatial_scale; + + // Force too small ROIs to be 1x1 + T roi_height = max(roi_end_h - roi_start_h, (T)0.1); // avoid 0 + T roi_width = max(roi_end_w - roi_start_w, (T)0.1); + + // Compute w and h at input feature map + T bin_size_h = roi_height / static_cast(pooled_height); + T bin_size_w = roi_width / static_cast(pooled_width); + + int hstart = floor(bin_size_h * static_cast(ph) + roi_start_h); + int wstart = floor(bin_size_w * static_cast(pw) + roi_start_w); + int hend = ceil(bin_size_h * static_cast(ph + 1) + roi_start_h); + int wend = ceil(bin_size_w * static_cast(pw + 1) + roi_start_w); + + // Add roi offsets and clip to input boundaries + hstart = min(max(hstart, 0), height); + hend = min(max(hend, 0), height); + wstart = min(max(wstart, 0), width); + wend = min(max(wend, 0), width); + bool is_empty = (hend <= hstart) || (wend <= wstart); + + // Accumulate diff_val into input data + T bin_area = static_cast((hend - hstart) * (wend - wstart)); + T diff_val = is_empty ? 0. : output_grad_data[i] / bin_area; + for (int ih = hstart; ih < hend; ++ih) { + for (int iw = wstart; iw < wend; ++iw) { + int input_index = ih * width + iw; + platform::CudaAtomicAdd(offset_input_grad_data + input_index, diff_val); + } + } + } +} + +template +class GPUPSROIPoolOpKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + auto* in = ctx.Input("X"); + auto* rois = ctx.Input("ROIs"); + auto* out = ctx.Output("Out"); + + auto pooled_height = ctx.Attr("pooled_height"); + auto pooled_width = ctx.Attr("pooled_width"); + auto output_channels = ctx.Attr("output_channels"); + auto spatial_scale = ctx.Attr("spatial_scale"); + + auto in_dims = in->dims(); + int batch_size = in_dims[0]; + int input_channels = in_dims[1]; + int height = in_dims[2]; + int width = in_dims[3]; + + PADDLE_ENFORCE_EQ(input_channels, + output_channels * pooled_height * pooled_width, + "the channels of input X should equal the product of " + "output_channels x pooled_height x pooled_width"); + + int rois_num = rois->dims()[0]; + if (rois_num == 0) return; + + auto rois_lod = rois->lod().back(); + int rois_batch_size = rois_lod.size() - 1; + PADDLE_ENFORCE_EQ( + rois_batch_size, batch_size, + "The rois_batch_size and input(X) batch_size must be the same."); + int rois_num_with_lod = rois_lod[rois_batch_size]; + PADDLE_ENFORCE_EQ(rois_num, rois_num_with_lod, + "The rois_num from input and lod must be the same."); + + // set rois batch id + framework::Tensor rois_batch_id_list; + rois_batch_id_list.Resize({rois_num}); + int* rois_batch_id_data = + rois_batch_id_list.mutable_data(platform::CPUPlace()); + for (int n = 0; n < rois_batch_size; ++n) { + for (size_t i = rois_lod[n]; i < rois_lod[n + 1]; ++i) { + rois_batch_id_data[i] = n; + } + } + + framework::Tensor rois_batch_id_list_gpu; + framework::TensorCopy(rois_batch_id_list, ctx.GetPlace(), + ctx.device_context(), &rois_batch_id_list_gpu); + + int output_size = out->numel(); + int blocks = NumBlocks(output_size); + int threads = kNumCUDAThreads; + + // call cuda kernel function + GPUPSROIPoolForward< + T><<>>( + output_size, in->data(), rois->data(), spatial_scale, + input_channels, height, width, output_channels, pooled_height, + pooled_width, rois_batch_id_list_gpu.data(), + out->mutable_data(ctx.GetPlace())); + } +}; + +template +class GPUPSROIPoolGradOpKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + auto* in = ctx.Input("X"); + auto* rois = ctx.Input("ROIs"); + + auto* output_grad = ctx.Input(framework::GradVarName("Out")); + auto* input_grad = ctx.Output(framework::GradVarName("X")); + + auto pooled_height = ctx.Attr("pooled_height"); + auto pooled_width = ctx.Attr("pooled_width"); + auto output_channels = ctx.Attr("output_channels"); + auto spatial_scale = ctx.Attr("spatial_scale"); + + int rois_num = rois->dims()[0]; + int input_channels = in->dims()[1]; + int height = in->dims()[2]; + int width = in->dims()[3]; + + if (input_grad) { + // set roi batch id + framework::Tensor rois_batch_id_list; + rois_batch_id_list.Resize({rois_num}); + int* rois_batch_id_data = + rois_batch_id_list.mutable_data(platform::CPUPlace()); + auto rois_lod = rois->lod().back(); + int rois_batch_size = rois_lod.size() - 1; + for (int n = 0; n < rois_batch_size; ++n) { + for (size_t i = rois_lod[n]; i < rois_lod[n + 1]; ++i) { + rois_batch_id_data[i] = n; + } + } + + framework::Tensor rois_batch_id_list_gpu; + framework::TensorCopy(rois_batch_id_list, ctx.GetPlace(), + ctx.device_context(), &rois_batch_id_list_gpu); + + input_grad->mutable_data(ctx.GetPlace()); + math::SetConstant set_zero; + set_zero(ctx.cuda_device_context(), input_grad, static_cast(0)); + + int output_grad_size = output_grad->numel(); + int blocks = NumBlocks(output_grad_size); + int threads = kNumCUDAThreads; + + if (output_grad_size > 0) { + GPUPSROIPoolBackward< + T><<>>( + output_grad_size, rois->data(), output_grad->data(), + spatial_scale, input_channels, height, width, output_channels, + pooled_height, pooled_width, rois_batch_id_list_gpu.data(), + input_grad->mutable_data(ctx.GetPlace())); + } + } + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OP_CUDA_KERNEL( + psroi_pool, + ops::GPUPSROIPoolOpKernel, + ops::GPUPSROIPoolOpKernel); +REGISTER_OP_CUDA_KERNEL( + psroi_pool_grad, + ops::GPUPSROIPoolGradOpKernel, + ops::GPUPSROIPoolGradOpKernel); diff --git a/paddle/fluid/operators/psroi_pool_op.h b/paddle/fluid/operators/psroi_pool_op.h new file mode 100644 index 0000000000000000000000000000000000000000..1a424728f7f6c4034242fb998d5121804e38702b --- /dev/null +++ b/paddle/fluid/operators/psroi_pool_op.h @@ -0,0 +1,253 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/math/math_function.h" + +namespace paddle { +namespace operators { + +template +class CPUPSROIPoolOpKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + auto* in = ctx.Input("X"); + auto* rois = ctx.Input("ROIs"); + auto* out = ctx.Output("Out"); + + auto pooled_height = ctx.Attr("pooled_height"); + auto pooled_width = ctx.Attr("pooled_width"); + auto spatial_scale = ctx.Attr("spatial_scale"); + auto output_channels = ctx.Attr("output_channels"); + + auto in_dims = in->dims(); + int batch_size = in_dims[0]; + int input_channels = in_dims[1]; + int height = in_dims[2]; + int width = in_dims[3]; + int rois_num = rois->dims()[0]; + + auto in_stride = framework::stride(in_dims); + auto roi_stride = framework::stride(rois->dims()); + auto out_stride = framework::stride(out->dims()); + + const T* input_data = in->data(); + + framework::Tensor rois_batch_id_list; + rois_batch_id_list.Resize({rois_num}); + int* rois_batch_id_data = + rois_batch_id_list.mutable_data(ctx.GetPlace()); + + auto rois_lod = rois->lod().back(); + int rois_batch_size = rois_lod.size() - 1; + PADDLE_ENFORCE_EQ( + rois_batch_size, batch_size, + "the rois_batch_size and input(X) batch_size should be the same."); + int rois_num_with_lod = rois_lod[rois_batch_size]; + PADDLE_ENFORCE_EQ(rois_num_with_lod, rois_num, + "the rois_num from input and lod must be the same"); + + PADDLE_ENFORCE_EQ(input_channels, + output_channels * pooled_height * pooled_width, + "the channels of input X should equal the product of " + "output_channels x pooled_height x pooled_width"); + + // calculate batch id index for each roi according to LoD + for (int n = 0; n < rois_batch_size; ++n) { + for (size_t i = rois_lod[n]; i < rois_lod[n + 1]; ++i) { + rois_batch_id_data[i] = n; + } + } + + T* output_data = out->mutable_data(ctx.GetPlace()); + const T* input_rois = rois->data(); + + // calculate psroipooling, parallel processing can be implemented per ROI + for (int n = 0; n < rois_num; ++n) { + // set roi batch id + int roi_batch_id = rois_batch_id_data[n]; + + // [start, end) interval for spatial sampling + const T* offset_input_rois = input_rois + n * 4; + T roi_start_w = + static_cast(round(offset_input_rois[0])) * spatial_scale; + T roi_start_h = + static_cast(round(offset_input_rois[1])) * spatial_scale; + T roi_end_w = + static_cast(round(offset_input_rois[2]) + 1.) * spatial_scale; + T roi_end_h = + static_cast(round(offset_input_rois[3]) + 1.) * spatial_scale; + + // Force too small rois to be 1 x 1 + T roi_height = std::max(roi_end_h - roi_start_h, (T)0.1); // avoid 0 + T roi_width = std::max(roi_end_w - roi_start_w, (T)0.1); + + // Compute bin size w and h at input feature map + T bin_size_h = roi_height / static_cast(pooled_height); + T bin_size_w = roi_width / static_cast(pooled_width); + + // calculate each pixel of the output feature map. + int out_roi_offset = n * out_stride[0]; + for (int c = 0; c < output_channels; ++c) { + // per category + int out_plane_offset = out_roi_offset + c * out_stride[1]; + for (int ph = 0; ph < pooled_height; ++ph) { + int out_row_offset = out_plane_offset + ph * out_stride[2]; + for (int pw = 0; pw < pooled_width; ++pw) { + // calculate w and h at input feature map + int hstart = floor(static_cast(ph) * bin_size_h + roi_start_h); + int wstart = floor(static_cast(pw) * bin_size_w + roi_start_w); + int hend = ceil(static_cast(ph + 1) * bin_size_h + roi_start_h); + int wend = ceil(static_cast(pw + 1) * bin_size_w + roi_start_w); + // Add roi offsets and clip to input boundaries + hstart = std::min(std::max(hstart, 0), height); + wstart = std::min(std::max(wstart, 0), width); + hend = std::min(std::max(hend, 0), height); + wend = std::min(std::max(wend, 0), width); + + int output_index = out_row_offset + pw; + int input_channel = (c * pooled_height + ph) * pooled_width + pw; + int input_plane_offset = + roi_batch_id * in_stride[0] + input_channel * in_stride[1]; + const T* offset_input_data = input_data + input_plane_offset; + T out_sum = 0.; + bool is_empty = (hend <= hstart) || (wend <= wstart); + for (int ih = hstart; ih < hend; ++ih) { + for (int iw = wstart; iw < wend; ++iw) { + int input_index = ih * in_stride[2] + iw; + out_sum += offset_input_data[input_index]; + } + } + T bin_area = (hend - hstart) * (wend - wstart); + output_data[output_index] = is_empty ? 0. : out_sum / bin_area; + } + } + } + } + return; + } +}; + +template +class CPUPSROIPoolGradOpKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + auto* in = ctx.Input("X"); + auto* rois = ctx.Input("ROIs"); + auto* output_grad = + ctx.Input(framework::GradVarName("Out")); + auto* input_grad = + ctx.Output(framework::GradVarName("X")); + + auto pooled_height = ctx.Attr("pooled_height"); + auto pooled_width = ctx.Attr("pooled_width"); + auto output_channels = ctx.Attr("output_channels"); + auto spatial_scale = ctx.Attr("spatial_scale"); + + if (input_grad) { + auto in_dims = in->dims(); + int input_channels = in_dims[1]; + int height = in_dims[2]; + int width = in_dims[3]; + int rois_num = rois->dims()[0]; + + // set roi batch id + framework::Tensor rois_batch_id_list; + rois_batch_id_list.Resize({rois_num}); + int* rois_batch_id_data = + rois_batch_id_list.mutable_data(ctx.GetPlace()); + auto rois_lod = rois->lod().back(); + int rois_batch_size = rois_lod.size() - 1; + // calculate batch id index for each roi according to LoD + for (int n = 0; n < rois_batch_size; ++n) { + for (size_t i = rois_lod[n]; i < rois_lod[n + 1]; ++i) { + rois_batch_id_data[i] = n; + } + } + + const T* input_rois = rois->data(); + const T* output_grad_data = output_grad->data(); + T* input_grad_data = input_grad->mutable_data(ctx.GetPlace()); + + // set gradient of X to be 0. before backpropagate. + math::SetConstant set_zero; + set_zero(ctx.template device_context(), input_grad, + static_cast(0)); + + // backpropagate gradient per output pixel + int output_grad_size = output_grad->numel(); + for (int i = 0; i < output_grad_size; ++i) { + // The output is in order (n, c, ph, pw) + int pw = i % pooled_width; + int ph = (i / pooled_width) % pooled_height; + int c = (i / pooled_width / pooled_height) % output_channels; + int n = i / pooled_width / pooled_height / output_channels; + + // set roi_batch_id + int roi_batch_id = rois_batch_id_data[n]; + int input_channel = (c * pooled_height + ph) * pooled_width + pw; + int input_offset = + (roi_batch_id * input_channels + input_channel) * height * width; + T* offset_input_grad_data = input_grad_data + input_offset; + + // [start, end) interval for spatial sampling + const T* offset_input_rois = input_rois + n * 4; + T roi_start_w = + static_cast(round(offset_input_rois[0])) * spatial_scale; + T roi_start_h = + static_cast(round(offset_input_rois[1])) * spatial_scale; + T roi_end_w = + static_cast(round(offset_input_rois[2]) + 1.) * spatial_scale; + T roi_end_h = + static_cast(round(offset_input_rois[3]) + 1.) * spatial_scale; + + // Force too small ROIs to be 1x1 + T roi_height = std::max(roi_end_h - roi_start_h, (T)0.1); // avoid 0 + T roi_width = std::max(roi_end_w - roi_start_w, (T)0.1); + + // Compute w and h at input feature map + T bin_size_h = roi_height / static_cast(pooled_height); + T bin_size_w = roi_width / static_cast(pooled_width); + + int hstart = floor(bin_size_h * static_cast(ph) + roi_start_h); + int wstart = floor(bin_size_w * static_cast(pw) + roi_start_w); + int hend = ceil(bin_size_h * static_cast(ph + 1) + roi_start_h); + int wend = ceil(bin_size_w * static_cast(pw + 1) + roi_start_w); + + // Add roi offsets and clip to input boundaries + hstart = std::min(std::max(hstart, 0), height); + hend = std::min(std::max(hend, 0), height); + wstart = std::min(std::max(wstart, 0), width); + wend = std::min(std::max(wend, 0), width); + bool is_empty = (hend <= hstart) || (wend <= wstart); + + // Accumulate diff_val into input data + T bin_area = static_cast((hend - hstart) * (wend - wstart)); + T diff_val = is_empty ? 0. : output_grad_data[i] / bin_area; + for (int ih = hstart; ih < hend; ++ih) { + for (int iw = wstart; iw < wend; ++iw) { + int input_index = ih * width + iw; + offset_input_grad_data[input_index] += diff_val; + } + } + } + } + return; + } +}; + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/py_func_op.cc b/paddle/fluid/operators/py_func_op.cc index 90a64333668eca9d2e8087c69879b0605e548f68..5d1aa7d7e651610b1a268482f59f3bb2e3d49209 100644 --- a/paddle/fluid/operators/py_func_op.cc +++ b/paddle/fluid/operators/py_func_op.cc @@ -40,14 +40,14 @@ static py::object *GetPythonCallableObject(size_t i) { return &g_py_callables[i]; } -std::string PythonObjectToString(const py::object &py_callable) { +static std::string PythonObjectToString(const py::object &py_callable) { py::gil_scoped_acquire guard; return py::str(*py_callable); } -void CallPythonFunc(py::object *callable, - const std::vector &ins, - std::vector *out) { +static void CallPythonFunc(py::object *callable, + const std::vector &ins, + std::vector *out) { py::gil_scoped_acquire guard; py::tuple in_args(ins.size()); for (size_t i = 0; i < ins.size(); ++i) { @@ -56,8 +56,19 @@ void CallPythonFunc(py::object *callable, auto ret = (*callable)(*in_args); auto ret_tuple = py::cast(ret); - PADDLE_ENFORCE_EQ(py::len(ret_tuple), out->size(), "Output number not match"); - for (size_t i = 0; i < out->size(); ++i) { + size_t ret_num = py::len(ret_tuple); + size_t out_num = out->size(); + if (ret_num != out_num) { + // Python function has no return values or returns None + // In this case, ret_num = 1 && ret[0] == None && out_num should be 0 + // Otherwise, ret_num must be equal to out_num + PADDLE_ENFORCE( + ret_num == 1 && out_num == 0 && + py::cast(ret_tuple[0]) == nullptr, + "Output number not match. Expected %d, actual %d", out_num, ret_num); + } + + for (size_t i = 0; i < out_num; ++i) { if ((*out)[i] == nullptr) { continue; } diff --git a/paddle/fluid/operators/reader/ctr_reader.h b/paddle/fluid/operators/reader/ctr_reader.h index 9b2a11bae12d242880829628faa089e1638424b0..7fc07efe7304701794595c9fa63f4a306d61e230 100644 --- a/paddle/fluid/operators/reader/ctr_reader.h +++ b/paddle/fluid/operators/reader/ctr_reader.h @@ -16,6 +16,7 @@ #include +#include #include // NOLINT #include #include @@ -55,8 +56,7 @@ class CTRReader : public framework::FileReader { PADDLE_ENFORCE_GT(thread_num, 0, "thread num should be larger then 0!"); PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null"); PADDLE_ENFORCE_GT(file_list.size(), 0, "file list should not be empty"); - thread_num_ = - file_list_.size() > thread_num ? thread_num : file_list_.size(); + thread_num_ = std::min(file_list_.size(), thread_num); queue_ = queue; SplitFiles(); for (size_t i = 0; i < thread_num_; ++i) { @@ -95,10 +95,10 @@ class CTRReader : public framework::FileReader { queue_->ReOpen(); VLOG(3) << "reopen success"; VLOG(3) << "thread_num " << thread_num_; - for (int thread_id = 0; thread_id < thread_num_; thread_id++) { - read_threads_.emplace_back(new std::thread( - std::bind(&ReadThread, file_groups_[thread_id], slots_, batch_size_, - thread_id, &read_thread_status_, queue_))); + for (size_t thread_id = 0; thread_id < thread_num_; thread_id++) { + read_threads_.emplace_back(new std::thread(std::bind( + &ReadThread, file_groups_[thread_id], slots_, batch_size_, + static_cast(thread_id), &read_thread_status_, queue_))); } monitor_thread_.reset(new std::thread( std::bind(&MonitorThread, &read_thread_status_, queue_))); diff --git a/paddle/fluid/platform/CMakeLists.txt b/paddle/fluid/platform/CMakeLists.txt index 93cb5eb2dc0b3480ebd05dcc6b36d8915d057bab..23c7ebe84221986a5f7ac7583c3a8e17d04fe4af 100644 --- a/paddle/fluid/platform/CMakeLists.txt +++ b/paddle/fluid/platform/CMakeLists.txt @@ -56,9 +56,16 @@ ELSE() set(MKLDNN_CTX_DEPS) ENDIF() +nv_library(stream_callback_manager SRCS stream_callback_manager.cc DEPS simple_threadpool enforce) +IF(WITH_GPU) + set(STREAM_CALLBACK_DEPS stream_callback_manager) +ELSE() + set(STREAM_CALLBACK_DEPS) +ENDIF() + # memcpy depends on device_context, here add deps individually for # avoiding cycle dependencies -cc_library(device_context SRCS device_context.cc init.cc DEPS simple_threadpool malloc +cc_library(device_context SRCS device_context.cc init.cc DEPS simple_threadpool malloc ${STREAM_CALLBACK_DEPS} place eigen3 stringpiece cpu_helper cpu_info framework_proto ${GPU_CTX_DEPS} ${MKLDNN_CTX_DEPS}) nv_test(device_context_test SRCS device_context_test.cu DEPS device_context gpu_info) diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index ce1494f1702aa2bbcb255dec122cdae776cbc4a0..812e56f1f966d03207cf83ad47cb88e9fa5d55bb 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -222,14 +222,10 @@ class CUDADeviceContext : public DeviceContext { template void AddStreamCallback(Callback&& callback) const { - std::lock_guard guard(callback_mtx_); callback_manager_->AddCallback(callback); } - void WaitStreamCallback() const { - std::lock_guard guard(callback_mtx_); - callback_manager_->Wait(); - } + void WaitStreamCallback() const { callback_manager_->Wait(); } #if CUDA_VERSION >= 9000 /*! \brief CublasCall may need to change cublas's config, @@ -260,9 +256,7 @@ class CUDADeviceContext : public DeviceContext { mutable std::mutex mtx_; - // This lock is only used by callback - // If we use mtx_ for StreamCallbackManager, deadlock may occur sometimes - mutable std::mutex callback_mtx_; + // StreamCallbackManager is thread-safe std::unique_ptr callback_manager_; mutable std::mutex cublas_mtx_; diff --git a/paddle/fluid/platform/stream_callback_manager.cc b/paddle/fluid/platform/stream_callback_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..466c77469ef256179c52442d21c1d62dfc4ef1bb --- /dev/null +++ b/paddle/fluid/platform/stream_callback_manager.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/platform/stream_callback_manager.h" +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace platform { + +#if CUDA_VERSION >= 10000 +static void CUDART_CB StreamCallbackFunc(void *user_data); +#else +static void CUDART_CB StreamCallbackFunc(cudaStream_t stream, + cudaError_t status, void *user_data) +#endif +{ + std::unique_ptr> func( + reinterpret_cast *>(user_data)); + (*func)(); +} + +StreamCallbackManager::StreamCallbackManager(const cudaStream_t stream) + : stream_(stream), thread_pool_(1) {} + +void StreamCallbackManager::AddCallback(std::function callback) const { + auto *callback_func = new std::function(std::move(callback)); + auto *func = new std::function([this, callback_func] { + std::lock_guard lock(mtx_); + last_future_ = thread_pool_.enqueue([callback_func] { + std::unique_ptr> releaser(callback_func); + (*callback_func)(); + }); + }); +#if CUDA_VERSION >= 10000 + PADDLE_ENFORCE(cudaLaunchHostFunc(stream_, StreamCallbackFunc, func)); +#else + PADDLE_ENFORCE(cudaStreamAddCallback(stream_, StreamCallbackFunc, func, 0)); +#endif +} + +void StreamCallbackManager::Wait() const { + PADDLE_ENFORCE(cudaStreamSynchronize(stream_)); + { + std::lock_guard lock(mtx_); + if (last_future_.valid()) { + last_future_.wait(); + } + } +} + +} // namespace platform +} // namespace paddle diff --git a/paddle/fluid/platform/stream_callback_manager.h b/paddle/fluid/platform/stream_callback_manager.h index ed8734c98cb996721a3002523c9276e6d7f492ae..8668bcb1131719e882ecbccb08ad00b63409eb28 100644 --- a/paddle/fluid/platform/stream_callback_manager.h +++ b/paddle/fluid/platform/stream_callback_manager.h @@ -18,67 +18,32 @@ #include #include #include +#include // NOLINT #include +#include // NOLINT + #include "paddle/fluid/platform/enforce.h" namespace paddle { namespace platform { -class StreamCallbackManager; - -struct StreamCallbackContext { - template - inline StreamCallbackContext(const StreamCallbackManager *manager, - Callback &&callback) - : manager_(manager), callback_(callback) {} - - const StreamCallbackManager *manager_; // do not own - std::function callback_; -}; - +// NOTE(zjl): clean StreamCallbackManager to make compilation faster +// Make StreamCallbackManager thread-safe class StreamCallbackManager { public: - explicit inline StreamCallbackManager(cudaStream_t stream = nullptr) - : stream_(stream), thread_pool_(new ThreadPool(1)) {} + explicit StreamCallbackManager(const cudaStream_t stream); + + ~StreamCallbackManager() = default; - template - inline void AddCallback(Callback &&callback) const { - auto *stream_callback_context = - new StreamCallbackContext(this, std::forward(callback)); -#if CUDA_VERSION >= 10000 - PADDLE_ENFORCE(cudaLaunchHostFunc(stream_, - StreamCallbackManager::StreamCallbackFunc, - stream_callback_context)); // NOLINT -#else - PADDLE_ENFORCE(cudaStreamAddCallback( - stream_, StreamCallbackManager::StreamCallbackFunc, - stream_callback_context, 0)); // NOLINT -#endif - } + void AddCallback(std::function callback) const; - void Wait() const { thread_pool_.reset(new ThreadPool(1)); } + void Wait() const; private: const cudaStream_t stream_; - mutable std::unique_ptr thread_pool_; - -// cudaStreamCallback cannot call CUDA API inside, so we have to use -// thread_pool here -#if CUDA_VERSION >= 10000 - static void CUDART_CB StreamCallbackFunc(void *user_data) -#else - static void CUDART_CB StreamCallbackFunc(cudaStream_t stream, - cudaError_t status, void *user_data) -#endif - { - auto *callback_context_ptr = - reinterpret_cast(user_data); - callback_context_ptr->manager_->thread_pool_->enqueue([=]() { - std::unique_ptr callback_context( - callback_context_ptr); - callback_context->callback_(); - }); - } + mutable ::ThreadPool thread_pool_; + mutable std::mutex mtx_; + mutable std::future last_future_; }; } // namespace platform diff --git a/paddle/fluid/pybind/tensor_py.h b/paddle/fluid/pybind/tensor_py.h index f67f40f19f676054e0ab140f12770f01624fac02..24800e17098759082fd047e51a10fa40ff48b961 100644 --- a/paddle/fluid/pybind/tensor_py.h +++ b/paddle/fluid/pybind/tensor_py.h @@ -162,7 +162,7 @@ void PyCPUTensorSetFromArray( paddle::platform::CPUPlace place) { std::vector dims; dims.reserve(array.ndim()); - for (size_t i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } @@ -182,7 +182,7 @@ inline void PyCPUTensorSetFromArray( paddle::platform::CPUPlace place) { std::vector dims; dims.reserve(array.ndim()); - for (int i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } @@ -200,7 +200,7 @@ void PyCUDATensorSetFromArray( paddle::platform::CUDAPlace place) { std::vector dims; dims.reserve(array.ndim()); - for (size_t i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } @@ -221,7 +221,7 @@ inline void PyCUDATensorSetFromArray( paddle::platform::CUDAPlace place) { std::vector dims; dims.reserve(array.ndim()); - for (size_t i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } @@ -240,7 +240,7 @@ void PyCUDAPinnedTensorSetFromArray( const paddle::platform::CUDAPinnedPlace &place) { std::vector dims; dims.reserve(array.ndim()); - for (size_t i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } @@ -260,7 +260,7 @@ inline void PyCUDAPinnedTensorSetFromArray( const paddle::platform::CUDAPinnedPlace &place) { std::vector dims; dims.reserve(array.ndim()); - for (size_t i = 0; i < array.ndim(); ++i) { + for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) { dims.push_back(static_cast(array.shape()[i])); } diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index a532f94c6dd08de68c56e7af974dbf9a371cf121..e0bb0d1152b258f9b16bb9063acf0ac6012d3432 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -126,9 +126,9 @@ def __bootstrap__(): 'check_nan_inf', 'benchmark', 'eager_delete_scope', 'use_mkldnn', 'use_ngraph', 'initial_cpu_memory_in_mb', 'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads', "dist_threadpool_size", - 'eager_delete_tensor_gb', 'allocator_strategy', - 'reader_queue_speed_test_mode', 'print_sub_graph_dir', - 'pe_profile_fname' + 'eager_delete_tensor_gb', 'fast_eager_deletion_mode', + 'allocator_strategy', 'reader_queue_speed_test_mode', + 'print_sub_graph_dir', 'pe_profile_fname' ] if 'Darwin' not in sysstr: read_env_flags.append('use_pinned_memory') diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index debe0ff0c94b6fa66f9b1dbeb8327b191082774f..db7ec9d021f79f55ce4632ffef6d17204ca14433 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -176,6 +176,7 @@ __all__ = [ 'get_tensor_from_selected_rows', 'lstm', 'py_func', + 'psroi_pool', ] kIgnoreIndex = -100 @@ -9127,11 +9128,11 @@ def get_tensor_from_selected_rows(x, name=None): return out -class PyFuncWrapper(object): +class PyFuncRegistry(object): _register_funcs = [] def __init__(self, func): - if func is None or not hasattr(func, '__call__'): + if func is None or not callable(func): raise TypeError('func must be a Python function') self._func = func @@ -9143,7 +9144,7 @@ class PyFuncWrapper(object): 1. For debug usage. Users can call :code:`py_func.registered_func(idx)` method - to find the registered function coresponding + to find the registered function corresponding to :code:`idx`. 2. For increasing reference count of self. @@ -9152,7 +9153,7 @@ class PyFuncWrapper(object): segmentation fault error in C++ side. May be lack of Python GC in C++ side? ''' - PyFuncWrapper._register_funcs.append(self) + PyFuncRegistry._register_funcs.append(self) @classmethod def registered_func(cls, idx): @@ -9249,8 +9250,8 @@ def py_func(func, x, out, backward_func=None, skip_vars_in_backward_input=None): raise TypeError( 'Output must be Variable/list(Variable)/tuple(Variable)') - fwd_func_id = PyFuncWrapper(func).id - bwd_func_id = PyFuncWrapper( + fwd_func_id = PyFuncRegistry(func).id + bwd_func_id = PyFuncRegistry( backward_func).id if backward_func is not None else -1 for each_out in out_list: @@ -9288,5 +9289,59 @@ def py_func(func, x, out, backward_func=None, skip_vars_in_backward_input=None): # For debug usage -py_func.registered_func = PyFuncWrapper.registered_func -py_func.registered_func_num = PyFuncWrapper.registered_func_num +py_func.registered_func = PyFuncRegistry.registered_func +py_func.registered_func_num = PyFuncRegistry.registered_func_num + + +@templatedoc() +def psroi_pool(input, + rois, + output_channels, + spatial_scale, + pooled_height, + pooled_width, + name=None): + """ + ${comment} + + Args: + input (Variable): ${x_comment} + rois (Variable): ROIs (Regions of Interest) to pool over. + output_channels (integer): ${output_channels_comment} + spatial_scale (float): ${spatial_scale_comment} Default: 1.0 + pooled_height (integer): ${pooled_height_comment} Default: 1 + pooled_width (integer): ${pooled_width_comment} Default: 1 + name (str, default None): The name of this layer. + + Returns: + Variable: ${out_comment}. + + Examples: + .. code-block:: python + + pool_out = fluid.layers.psroi_pool(input=x, rois=rois, 490, 1.0, 7, 7) + """ + helper = LayerHelper('psroi_pool', **locals()) + # check attrs + if not isinstance(output_channels, int): + raise TypeError("output_channels must be int type") + if not isinstance(spatial_scale, float): + raise TypeError("spatial_scale must be float type") + if not isinstance(pooled_height, int): + raise TypeError("pooled_height must be int type") + if not isinstance(pooled_width, int): + raise TypeError("pooled_width must be int type") + dtype = helper.input_dtype() + out = helper.create_variable_for_type_inference(dtype) + helper.append_op( + type='psroi_pool', + inputs={'X': input, + 'ROIs': rois}, + outputs={'Out': out}, + attrs={ + 'output_channels': output_channels, + 'spatial_scale': spatial_scale, + 'pooled_height': pooled_height, + 'pooled_width': pooled_width + }) + return out diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py new file mode 100644 index 0000000000000000000000000000000000000000..e91cfe0b45ab7e4e56fccf8d49eb381fbbd199d1 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py @@ -0,0 +1,86 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +os.environ['FLAGS_eager_delete_tensor_gb'] = '0.0' +os.environ['CPU_NUM'] = '2' + +import six +import unittest + +import paddle +import paddle.fluid.core as core +import paddle.fluid as fluid + + +def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2): + if use_cuda and not core.is_compiled_with_cuda(): + print('Skip use_cuda=True because Paddle is not compiled with cuda') + return + + word_dict = paddle.dataset.imdb.word_dict() + train_reader = paddle.batch( + paddle.dataset.imdb.train(word_dict), batch_size=batch_size) + + data = fluid.layers.data( + name="words", shape=[1], dtype="int64", lod_level=1) + + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + + cost = network(data, label, len(word_dict)) + optimizer = fluid.optimizer.Adagrad(learning_rate=0.2) + optimizer.minimize(cost) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + feeder = fluid.DataFeeder(feed_list=[data, label], place=place) + reader = feeder.decorate_reader( + train_reader, multi_devices=use_parallel_executor) + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + if use_parallel_executor: + train_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, loss_name=cost.name) + fetch_list = [cost.name] + else: + train_exe = exe + fetch_list = [cost] + + for pass_id in six.moves.xrange(pass_num): + batch_id = 0 + for data in reader(): + train_exe.run(feed=data, + fetch_list=fetch_list if batch_id % 4 == 0 else []) + batch_id += 1 + if batch_id > 16: + break + + +class TestBase(unittest.TestCase): + def setUp(self): + self.net = None + + def test_network(self): + if self.net is None: + return + + for use_cuda in [True, False]: + for use_parallel_executor in [False, True]: + print('network: {}, use_cuda: {}, use_parallel_executor: {}'. + format(self.net.__name__, use_cuda, + use_parallel_executor)) + with fluid.program_guard(fluid.Program(), fluid.Program()): + with fluid.scope_guard(core.Scope()): + train(self.net, use_cuda, use_parallel_executor) diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_gru_net.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_gru_net.py new file mode 100644 index 0000000000000000000000000000000000000000..5ed3d9fdf3bf765f1b9ef8ba1ef2a5795f1874c7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_gru_net.py @@ -0,0 +1,49 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test_eager_deletion_dynamic_rnn_base import TestBase +import paddle.fluid as fluid + + +def gru_net(data, + label, + dict_dim, + emb_dim=128, + hid_dim=128, + hid_dim2=96, + class_dim=2, + emb_lr=400.0): + emb = fluid.layers.embedding( + input=data, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr(learning_rate=emb_lr)) + fc0 = fluid.layers.fc(input=emb, size=hid_dim * 3) + gru_h = fluid.layers.dynamic_gru(input=fc0, size=hid_dim, is_reverse=False) + gru_max = fluid.layers.sequence_pool(input=gru_h, pool_type='max') + gru_max_tanh = fluid.layers.tanh(gru_max) + fc1 = fluid.layers.fc(input=gru_max_tanh, size=hid_dim2, act='tanh') + prediction = fluid.layers.fc(input=fc1, size=class_dim, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=label) + avg_cost = fluid.layers.mean(x=cost) + return avg_cost + + +class GRUTest(TestBase): + def setUp(self): + self.net = gru_net + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_lstm_net.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_lstm_net.py new file mode 100644 index 0000000000000000000000000000000000000000..8462c06aa56e0469fd06c7dc4b2ed514f7eb51ba --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_lstm_net.py @@ -0,0 +1,50 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from test_eager_deletion_dynamic_rnn_base import TestBase +import paddle.fluid as fluid +import unittest + + +def lstm_net(data, + label, + dict_dim, + emb_dim=128, + hid_dim=128, + hid_dim2=96, + class_dim=2, + emb_lr=30.0): + emb = fluid.layers.embedding( + input=data, + size=[dict_dim, emb_dim], + param_attr=fluid.ParamAttr(learning_rate=emb_lr)) + fc0 = fluid.layers.fc(input=emb, size=hid_dim * 4) + lstm_h, c = fluid.layers.dynamic_lstm( + input=fc0, size=hid_dim * 4, is_reverse=False) + lstm_max = fluid.layers.sequence_pool(input=lstm_h, pool_type='max') + lstm_max_tanh = fluid.layers.tanh(lstm_max) + fc1 = fluid.layers.fc(input=lstm_max_tanh, size=hid_dim2, act='tanh') + prediction = fluid.layers.fc(input=fc1, size=class_dim, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=label) + avg_cost = fluid.layers.mean(x=cost) + return avg_cost + + +class LSTMTest(TestBase): + def setUp(self): + self.net = lstm_net + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py new file mode 100644 index 0000000000000000000000000000000000000000..7ec1f0ae753724dac5c4675926ead87a097a7a99 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py @@ -0,0 +1,27 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0" + +from test_parallel_executor_mnist import TestMNIST + + +class EagerDeletionTestMNIST(TestMNIST): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_transformer.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_transformer.py new file mode 100644 index 0000000000000000000000000000000000000000..754d5fd40953311a5deb466fa42216f72671a65a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_transformer.py @@ -0,0 +1,27 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0" + +from test_parallel_executor_transformer import TestTransformer + + +class EagerDeletionTestTransformer(TestTransformer): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_layers.py b/python/paddle/fluid/tests/unittests/test_layers.py index 10e8bb5a86691d8654c5ae48794e49f30f47500d..fb3e4da1efd32ca99f57da8f9955803ddde04f8a 100644 --- a/python/paddle/fluid/tests/unittests/test_layers.py +++ b/python/paddle/fluid/tests/unittests/test_layers.py @@ -511,6 +511,16 @@ class TestBook(unittest.TestCase): self.assertIsNotNone(output) print(str(program)) + def test_psroi_pool(self): + program = Program() + with program_guard(program): + x = layers.data(name="x", shape=[245, 30, 30], dtype="float32") + rois = layers.data( + name="rois", shape=[4], dtype="float32", lod_level=1) + output = layers.psroi_pool(x, rois, 5, 0.25, 7, 7) + self.assertIsNotNone(output) + print(str(program)) + def test_roi_align(self): program = Program() with program_guard(program): diff --git a/python/paddle/fluid/tests/unittests/test_psroi_pool_op.py b/python/paddle/fluid/tests/unittests/test_psroi_pool_op.py new file mode 100644 index 0000000000000000000000000000000000000000..abe014a38c6ecfd008b0f1028536bfb49b628fb4 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_psroi_pool_op.py @@ -0,0 +1,134 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import math +import numpy as np +import unittest +from op_test import OpTest + + +class TestPSROIPoolOp(OpTest): + def set_data(self): + self.init_test_case() + self.make_rois() + self.calc_psroi_pool() + self.inputs = {'X': self.x, 'ROIs': (self.rois[:, 1:5], self.rois_lod)} + self.attrs = { + 'output_channels': self.output_channels, + 'spatial_scale': self.spatial_scale, + 'pooled_height': self.pooled_height, + 'pooled_width': self.pooled_width + } + self.outputs = {'Out': self.outs} + + def init_test_case(self): + self.batch_size = 3 + self.channels = 3 * 2 * 2 + self.height = 6 + self.width = 4 + + self.x_dim = [self.batch_size, self.channels, self.height, self.width] + + self.spatial_scale = 1.0 / 4.0 + self.output_channels = 3 + self.pooled_height = 2 + self.pooled_width = 2 + + self.x = np.random.random(self.x_dim).astype('float32') + + def make_rois(self): + rois = [] + self.rois_lod = [[]] + for bno in range(self.batch_size): + self.rois_lod[0].append(bno + 1) + for i in range(bno + 1): + x1 = np.random.random_integers( + 0, self.width // self.spatial_scale - self.pooled_width) + y1 = np.random.random_integers( + 0, self.height // self.spatial_scale - self.pooled_height) + + x2 = np.random.random_integers(x1 + self.pooled_width, + self.width // self.spatial_scale) + y2 = np.random.random_integers( + y1 + self.pooled_height, self.height // self.spatial_scale) + roi = [bno, x1, y1, x2, y2] + rois.append(roi) + self.rois_num = len(rois) + self.rois = np.array(rois).astype('float32') + + def calc_psroi_pool(self): + output_shape = (self.rois_num, self.output_channels, self.pooled_height, + self.pooled_width) + out_data = np.zeros(output_shape) + for i in range(self.rois_num): + roi = self.rois[i] + roi_batch_id = int(roi[0]) + roi_start_w = round(roi[1]) * self.spatial_scale + roi_start_h = round(roi[2]) * self.spatial_scale + roi_end_w = (round(roi[3]) + 1.) * self.spatial_scale + roi_end_h = (round(roi[4]) + 1.) * self.spatial_scale + + roi_height = max(roi_end_h - roi_start_h, 0.1) + roi_width = max(roi_end_w - roi_start_w, 0.1) + + bin_size_h = roi_height / float(self.pooled_height) + bin_size_w = roi_width / float(self.pooled_width) + + x_i = self.x[roi_batch_id] + + for c in range(self.output_channels): + for ph in range(self.pooled_height): + for pw in range(self.pooled_width): + hstart = int( + math.floor(float(ph) * bin_size_h + roi_start_h)) + wstart = int( + math.floor(float(pw) * bin_size_w + roi_start_w)) + hend = int( + math.ceil( + float(ph + 1) * bin_size_h + roi_start_h)) + wend = int( + math.ceil( + float(pw + 1) * bin_size_w + roi_start_w)) + hstart = min(max(hstart, 0), self.height) + hend = min(max(hend, 0), self.height) + wstart = min(max(wstart, 0), self.width) + wend = min(max(wend, 0), self.width) + + c_in = (c * self.pooled_height + ph + ) * self.pooled_width + pw + is_empty = (hend <= hstart) or (wend <= wstart) + out_sum = 0. + for ih in range(hstart, hend): + for iw in range(wstart, wend): + out_sum += x_i[c_in, ih, iw] + bin_area = (hend - hstart) * (wend - wstart) + out_data[i, c, ph, pw] = 0. if is_empty else ( + out_sum / float(bin_area)) + self.outs = out_data.astype('float32') + + def setUp(self): + self.op_type = 'psroi_pool' + self.set_data() + + def test_check_output(self): + self.check_output() + + def test_check_grad(self): + self.check_grad(['X'], 'Out') + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_func_op.py b/python/paddle/fluid/tests/unittests/test_py_func_op.py index c71f2bdea8391c95496493003a039907dfabdffe..491bbc219024d921da483ddbee9e75b5add11afd 100644 --- a/python/paddle/fluid/tests/unittests/test_py_func_op.py +++ b/python/paddle/fluid/tests/unittests/test_py_func_op.py @@ -12,12 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import paddle.fluid as fluid import paddle import unittest import six import numpy as np +dev_cnt = 2 +if fluid.core.is_compiled_with_cuda(): + dev_cnt = fluid.core.get_cuda_device_count() +os.environ['CPU_NUM'] = str(dev_cnt) + def tanh(x): return np.tanh(x) @@ -86,17 +92,18 @@ def simple_fc_net(img, label, use_py_func_op): out=loss, backward_func=cross_entropy_grad, skip_vars_in_backward_input=loss) + loss = fluid.layers.mean(loss) return loss def reader(): - for _ in six.moves.range(100): + for _ in six.moves.range(dev_cnt * 100): yield np.random.random([784]), np.random.random_integers( size=[1], low=0, high=9) -def test_main(use_cuda, use_py_func_op): +def test_main(use_cuda, use_py_func_op, use_parallel_executor): if use_cuda and not fluid.core.is_compiled_with_cuda(): return None @@ -118,21 +125,32 @@ def test_main(use_cuda, use_py_func_op): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + if use_parallel_executor: + exe = fluid.ParallelExecutor( + use_cuda=use_cuda, loss_name=loss.name) + fetch_list = [loss.name] + else: + fetch_list = [loss] + ret = [] for epoch_id in six.moves.range(2): for d in r(): - L, = exe.run(feed=feeder.feed(d), fetch_list=[loss]) - ret.append(L[0]) + L, = exe.run(feed=feeder.feed(d), fetch_list=fetch_list) + ret.append(L) return np.array(ret) -class TestPyFuncOp(unittest.TestCase): +class TestPyFuncOpUseExecutor(unittest.TestCase): + def setUp(self): + self.use_parallel_executor = False + def test_loss_diff(self): losses = [] for use_cuda in [True, False]: for use_py_func_op in [True, False]: - L = test_main(use_cuda, use_py_func_op) + L = test_main(use_cuda, use_py_func_op, + self.use_parallel_executor) if L is not None: losses.append(L) @@ -141,5 +159,10 @@ class TestPyFuncOp(unittest.TestCase): self.assertAlmostEqual(max_diff, 0, delta=1e-3) +class TestPyFuncOpUseParallelExecutor(unittest.TestCase): + def setUp(self): + self.use_parallel_executor = True + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_regularizer.py b/python/paddle/fluid/tests/unittests/test_regularizer.py index 20f91cf4485f2e79c20fe90143c8b7deebb9fc49..62994eec7e7f56267a0990d9a5e3b5c62d7d5fe4 100644 --- a/python/paddle/fluid/tests/unittests/test_regularizer.py +++ b/python/paddle/fluid/tests/unittests/test_regularizer.py @@ -15,7 +15,12 @@ from __future__ import print_function import unittest - +from functools import partial +import contextlib +import numpy as np +import paddle +import paddle.fluid.core as core +import paddle.fluid as fluid import paddle.fluid.framework as framework import paddle.fluid.optimizer as optimizer import paddle.fluid.regularizer as regularizer @@ -97,5 +102,134 @@ class TestL1DecayRegularizer(unittest.TestCase): self.assertEqual(block.ops[-3].type, 'sign') +def bow_net(data, + label, + dict_dim, + is_sparse=False, + emb_dim=128, + hid_dim=128, + hid_dim2=96, + class_dim=2): + """ + BOW net + This model is from https://github.com/PaddlePaddle/models: + fluid/PaddleNLP/text_classification/nets.py + """ + emb = fluid.layers.embedding( + input=data, is_sparse=is_sparse, size=[dict_dim, emb_dim]) + bow = fluid.layers.sequence_pool(input=emb, pool_type='sum') + bow_tanh = fluid.layers.tanh(bow) + fc_1 = fluid.layers.fc(input=bow_tanh, size=hid_dim, act="tanh") + fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim2, act="tanh") + prediction = fluid.layers.fc(input=[fc_2], size=class_dim, act="softmax") + cost = fluid.layers.cross_entropy(input=prediction, label=label) + avg_cost = fluid.layers.mean(x=cost) + + return avg_cost + + +class TestRegularizer(unittest.TestCase): + def setUp(self): + self.word_dict = paddle.dataset.imdb.word_dict() + reader = paddle.batch( + paddle.dataset.imdb.train(self.word_dict), batch_size=8)() + self.train_data = [next(reader) for _ in range(5)] + + def get_places(self): + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + return places + + @contextlib.contextmanager + def scope_prog_guard(self, main_prog, startup_prog): + scope = fluid.core.Scope() + with fluid.unique_name.guard(): + with fluid.scope_guard(scope): + with fluid.program_guard(main_prog, startup_prog): + yield + + def run_program(self, place, feed_list): + exe = fluid.Executor(place) + feeder = fluid.DataFeeder(feed_list=feed_list, place=place) + exe.run(fluid.default_startup_program()) + + main_prog = fluid.default_main_program() + param_list = [var.name for var in main_prog.block(0).all_parameters()] + + param_sum = [] + for data in self.train_data: + out = exe.run(main_prog, + feed=feeder.feed(data), + fetch_list=param_list) + p_sum = 0 + for v in out: + p_sum += np.sum(np.abs(v)) + param_sum.append(p_sum) + return param_sum + + def check_l2decay_regularizer(self, place, model): + main_prog = fluid.framework.Program() + startup_prog = fluid.framework.Program() + startup_prog.random_seed = 1 + with self.scope_prog_guard( + main_prog=main_prog, startup_prog=startup_prog): + data = fluid.layers.data( + name="words", shape=[1], dtype="int64", lod_level=1) + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + + avg_cost = model(data, label, len(self.word_dict)) + + optimizer = fluid.optimizer.Adagrad( + learning_rate=0.1, + regularization=fluid.regularizer.L2Decay(1.0)) + optimizer.minimize(avg_cost) + param_sum = self.run_program(place, [data, label]) + return param_sum + + def check_l2decay(self, place, model): + main_prog = fluid.framework.Program() + startup_prog = fluid.framework.Program() + startup_prog.random_seed = 1 + with self.scope_prog_guard( + main_prog=main_prog, startup_prog=startup_prog): + data = fluid.layers.data( + name="words", shape=[1], dtype="int64", lod_level=1) + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + + avg_cost_l2 = model(data, label, len(self.word_dict)) + + param_list = fluid.default_main_program().block(0).all_parameters() + para_sum = [] + for para in param_list: + para_mul = fluid.layers.square(x=para) + para_sum.append(fluid.layers.reduce_sum(input=para_mul)) + avg_cost_l2 += fluid.layers.sums(para_sum) * .5 + + optimizer = fluid.optimizer.Adagrad(learning_rate=0.1) + optimizer.minimize(avg_cost_l2) + param_sum = self.run_program(place, [data, label]) + return param_sum + + def test_l2(self): + for place in self.get_places(): + dense_sparse_p_sum = [] + for sparse in [True, False]: + model = partial(bow_net, is_sparse=sparse) + framework_l2 = self.check_l2decay_regularizer(place, model) + l2 = self.check_l2decay(place, model) + assert len(l2) == len(framework_l2) + for i in range(len(l2)): + assert np.isclose(a=framework_l2[i], b=l2[i], rtol=5e-5) + dense_sparse_p_sum.append(framework_l2) + + assert len(dense_sparse_p_sum[0]) == len(dense_sparse_p_sum[1]) + for i in range(len(dense_sparse_p_sum[0])): + assert np.isclose( + a=dense_sparse_p_sum[0][i], + b=dense_sparse_p_sum[1][i], + rtol=5e-5) + + if __name__ == '__main__': unittest.main()