diff --git a/cmake/external/mkldnn.cmake b/cmake/external/mkldnn.cmake index 20dda35c5ccd98f5672d867c26ab97a215483543..260985cc8aa4ad0f231798666c048703b64c6d15 100644 --- a/cmake/external/mkldnn.cmake +++ b/cmake/external/mkldnn.cmake @@ -24,7 +24,7 @@ SET(MKLDNN_INSTALL_DIR ${THIRD_PARTY_PATH}/install/mkldnn) SET(MKLDNN_INC_DIR "${MKLDNN_INSTALL_DIR}/include" CACHE PATH "mkldnn include directory." FORCE) IF(WIN32 OR APPLE) - MESSAGE(WARNING + MESSAGE(WARNING "Windows or Mac is not supported with MKLDNN in Paddle yet." "Force WITH_MKLDNN=OFF") SET(WITH_MKLDNN OFF CACHE STRING "Disable MKLDNN in Windows and MacOS" FORCE) @@ -57,8 +57,10 @@ ExternalProject_Add( GIT_TAG "a29d8487a63afca3d5b8c5bbdbb473cf8ccc6e51" PREFIX ${MKLDNN_SOURCES_DIR} UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${MKLDNN_INSTALL_DIR} - CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} CMAKE_ARGS -DMKLROOT=${MKLML_ROOT} CMAKE_ARGS -DCMAKE_C_FLAGS=${MKLDNN_CFLAG} CMAKE_ARGS -DCMAKE_CXX_FLAGS=${MKLDNN_CXXFLAG} diff --git a/doc/fluid/design/ir/draft.md b/doc/fluid/design/ir/overview.md similarity index 97% rename from doc/fluid/design/ir/draft.md rename to doc/fluid/design/ir/overview.md index c29337cba1fe859e4968cb800e4e7d9ff6a54d31..83ef97c99efeaf27a27f93f0cd3857c0f1bc812e 100644 --- a/doc/fluid/design/ir/draft.md +++ b/doc/fluid/design/ir/overview.md @@ -177,8 +177,8 @@ graph = PassRegistry::Instance().Get("op_fuse_pass").Apply(std::move(grah)); auto mem_opt_pass = PassRegistry::Instance().Get("memory_optimization_pass"); mem_opt_pass.SetNotOwned("optimize_level", 1); mem_opt_pass->Apply(std::move(graph)); -graph = PassRegistry::Instance().Get("multi_device_pass").Apply(std::move(grah)); -graph = PassRegistry::Instance().Get("multi_device_check_pass").Apply(std::move(grah)); +graph = PassRegistry::Instance().Get("multi_devices_pass").Apply(std::move(grah)); +graph = PassRegistry::Instance().Get("multi_devices_check_pass").Apply(std::move(grah)); Executor exe; exe.Run(graph); diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 6440607dbe4666ff3ff91dc526465706b3b9c1f0..1d62792b80dd002b894da28be9162fc7d3ce054e 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -100,7 +100,7 @@ else() endif() -cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph graph_viz_pass multi_devices_graph_builder ssa_graph_printer ssa_graph_checker) +cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass) cc_library(prune SRCS prune.cc DEPS framework_proto) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 5d652d37307d0a55ffee14930ae180dcd3e27841..8f6c4163d6ee11fbe83f603f6148c2ac6175324d 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -5,9 +5,9 @@ cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(rpc_op_handle SRCS rpc_op_handle.cc DEPS framework_proto scope place operator op_registry) -cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS graph graph_helper) -cc_library(ssa_graph_printer SRCS ssa_graph_printer.cc DEPS ssa_graph_builder) -cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder) +cc_library(multi_devices_helper SRCS multi_devices_helper.cc DEPS graph graph_helper) +cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc DEPS multi_devices_helper) +cc_library(multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper) cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows) @@ -28,7 +28,7 @@ cc_library(data_balance_op_handle SRCS data_balance_op_handle.cc DEPS op_handle_ cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor) cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base scope) -cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle +cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS graph framework_proto) diff --git a/paddle/fluid/framework/details/ssa_graph_checker.cc b/paddle/fluid/framework/details/multi_devices_graph_check_pass.cc similarity index 95% rename from paddle/fluid/framework/details/ssa_graph_checker.cc rename to paddle/fluid/framework/details/multi_devices_graph_check_pass.cc index b9e1cda1f24810009bc74a7abdf0156f723a1755..c9c255864a2477ed29873f8521acce37fa928c06 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_check_pass.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/framework/details/ssa_graph_checker.h" +#include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include #include "paddle/fluid/framework/ir/graph.h" @@ -86,7 +86,7 @@ bool SSAGraghBuilderWithChecker::IsValidGraph(const ir::Graph *graph) const { } // namespace framework } // namespace paddle -REGISTER_PASS(multi_device_check_pass, +REGISTER_PASS(multi_devices_check_pass, paddle::framework::details::SSAGraghBuilderWithChecker) .RequireGraphAttr(paddle::framework::details::kGraphVars) .RequireGraphAttr(paddle::framework::details::kGraphDepVars) diff --git a/paddle/fluid/framework/details/ssa_graph_checker.h b/paddle/fluid/framework/details/multi_devices_graph_check_pass.h similarity index 89% rename from paddle/fluid/framework/details/ssa_graph_checker.h rename to paddle/fluid/framework/details/multi_devices_graph_check_pass.h index 0e861ecb236361992d9883e3bd0e679f7563b539..1e2b1867c376956d7d2dac465c13e2f3f64ba7eb 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.h +++ b/paddle/fluid/framework/details/multi_devices_graph_check_pass.h @@ -14,7 +14,7 @@ #pragma once -#include "paddle/fluid/framework/details/ssa_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" #include @@ -22,7 +22,7 @@ namespace paddle { namespace framework { namespace details { -class SSAGraghBuilderWithChecker : public SSAGraphBuilder { +class SSAGraghBuilderWithChecker : public ir::Pass { protected: std::unique_ptr ApplyImpl( std::unique_ptr graph) const override { diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc similarity index 90% rename from paddle/fluid/framework/details/multi_devices_graph_builder.cc rename to paddle/fluid/framework/details/multi_devices_graph_pass.cc index a4fdbcb26d1d0cfb05edebff5419d9559c336b3a..c5a13e7e1f45e1eb9b4271880630c52d30022f4b 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -21,7 +21,7 @@ #include "paddle/fluid/framework/details/broadcast_op_handle.h" #include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/data_balance_op_handle.h" -#include "paddle/fluid/framework/details/multi_devices_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_graph_pass.h" #include "paddle/fluid/framework/details/reduce_op_handle.h" #include "paddle/fluid/framework/details/rpc_op_handle.h" #include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h" @@ -33,6 +33,92 @@ namespace paddle { namespace framework { namespace details { +namespace { +void PolishGraphToSupportDataHazards(ir::Graph *graph) { + for (auto &var_map : graph->Get(kGraphVars)) { + for (auto &name_pair : var_map) { + if (name_pair.second.size() <= 1) { + continue; + } + auto it_new = name_pair.second.rbegin(); + auto it_old = name_pair.second.rbegin(); + ++it_old; + for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) { + OpHandleBase *write_op = (*it_new)->GeneratedOp(); + const auto &read_ops = (*it_old)->PendingOps(); + + for (auto *read_op : read_ops) { + // Manually add a dependency var from read_op to write_op; + if (read_op == write_op) { + // Read Write is the same op. + continue; + } + bool has_dep = false; + for (auto *r_out : read_op->Outputs()) { + for (auto *w_in : write_op->Inputs()) { + if (r_out->Node() == w_in->Node()) { + has_dep = true; + break; + } + } + } + if (has_dep) continue; + + auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); + read_op->AddOutput(dep_var); + write_op->AddInput(dep_var); + graph->Get(kGraphDepVars).emplace(dep_var); + } + } + } + } +} + +VarHandle *CreateOrGetLatestVarHandle(ir::Graph *graph, ir::Node *node, + const platform::Place &place, + size_t place_offset) { + auto &var_holders = graph->Get(kGraphVars)[place_offset]; + auto &var_holder = var_holders[node->Name()]; + VarHandle *var = nullptr; + if (var_holder.empty()) { + if (node->Var()) { + var = new VarHandle(graph->CreateVarNode(node->Var()), 0, place_offset, + node->Name(), place); + } else { + var = new VarHandle( + graph->CreateEmptyNode(node->Name(), ir::Node::Type::kVariable), 0, + place_offset, node->Name(), place); + } + var_holder.emplace_back(var); + } else { + var = var_holder.rbegin()->get(); + } + return var; +} + +void CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, + ir::Node *new_node, const platform::Place &place, + size_t place_offset) { + auto &vars = + graph->Get(kGraphVars)[place_offset][new_node->Name()]; + size_t version = vars.size(); + auto var = + new VarHandle(new_node, version, place_offset, new_node->Name(), place); + vars.emplace_back(var); + op_handle->AddOutput(var); +} + +void AddOutputToLeafOps(ir::Graph *graph) { + for (auto &op : graph->Get(kGraphOps)) { + if (!op->Outputs().empty()) { + continue; + } + auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); + graph->Get(kGraphDepVars).emplace(dummy_leaf); + op->AddOutput(dummy_leaf); + } +} +} // namespace static const char kLossVarName[] = "loss_var_name"; static const char kPlaces[] = "places"; @@ -751,7 +837,7 @@ bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const { } // namespace framework } // namespace paddle -REGISTER_PASS(multi_device_pass, +REGISTER_PASS(multi_devices_pass, paddle::framework::details::MultiDevSSAGraphBuilder) .RequirePassAttr(paddle::framework::details::kLossVarName) .RequirePassAttr(paddle::framework::details::kPlaces) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h similarity index 96% rename from paddle/fluid/framework/details/multi_devices_graph_builder.h rename to paddle/fluid/framework/details/multi_devices_graph_pass.h index f2cb6bb1c861e07f1034f1742ad4f3cfbb0d8837..7a6f238f9cf7af18cb10ea271e453fec1902c833 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -18,7 +18,7 @@ #include #include "paddle/fluid/framework/details/build_strategy.h" -#include "paddle/fluid/framework/details/ssa_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph.h" namespace paddle { @@ -30,7 +30,7 @@ namespace framework { class Scope; namespace details { -class MultiDevSSAGraphBuilder : public SSAGraphBuilder { +class MultiDevSSAGraphBuilder : public ir::Pass { protected: std::unique_ptr ApplyImpl( std::unique_ptr graph) const override; diff --git a/paddle/fluid/framework/details/ssa_graph_printer.cc b/paddle/fluid/framework/details/multi_devices_graph_print_pass.cc similarity index 95% rename from paddle/fluid/framework/details/ssa_graph_printer.cc rename to paddle/fluid/framework/details/multi_devices_graph_print_pass.cc index ec3f31ab8d135efd2c77018e90cec46b25ca5e66..69944a42b688a9ea5ff29f75f18dd4b156848a27 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_print_pass.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/framework/details/ssa_graph_printer.h" +#include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" #include #include "paddle/fluid/framework/ir/graph.h" @@ -82,5 +82,5 @@ void GraphvizSSAGraphPrinter::Print(const ir::Graph &graph, } // namespace framework } // namespace paddle -REGISTER_PASS(multi_device_print_pass, +REGISTER_PASS(multi_devices_print_pass, paddle::framework::details::SSAGraghBuilderWithPrinter); diff --git a/paddle/fluid/framework/details/ssa_graph_printer.h b/paddle/fluid/framework/details/multi_devices_graph_print_pass.h similarity index 92% rename from paddle/fluid/framework/details/ssa_graph_printer.h rename to paddle/fluid/framework/details/multi_devices_graph_print_pass.h index 5eafd1805c3102dbd3cdfa68ee1495631c182b51..c00685fa1629c0722c315c726053c2cba8bf17e7 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.h +++ b/paddle/fluid/framework/details/multi_devices_graph_print_pass.h @@ -18,7 +18,7 @@ #include #include #include -#include "paddle/fluid/framework/details/ssa_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" namespace paddle { namespace framework { @@ -35,7 +35,7 @@ class GraphvizSSAGraphPrinter : public SSAGraphPrinter { void Print(const ir::Graph& graph, std::ostream& sout) const override; }; -class SSAGraghBuilderWithPrinter : public SSAGraphBuilder { +class SSAGraghBuilderWithPrinter : public ir::Pass { protected: std::unique_ptr ApplyImpl( std::unique_ptr graph) const override { diff --git a/paddle/fluid/framework/details/multi_devices_helper.cc b/paddle/fluid/framework/details/multi_devices_helper.cc new file mode 100644 index 0000000000000000000000000000000000000000..0242274a16c50508f2c0294264c175515c7293ef --- /dev/null +++ b/paddle/fluid/framework/details/multi_devices_helper.cc @@ -0,0 +1,20 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "paddle/fluid/framework/details/multi_devices_helper.h" + +namespace paddle { +namespace framework { +namespace details {} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/ssa_graph_builder.h b/paddle/fluid/framework/details/multi_devices_helper.h similarity index 68% rename from paddle/fluid/framework/details/ssa_graph_builder.h rename to paddle/fluid/framework/details/multi_devices_helper.h index 53a4ad003d51a27a044d7a142434545eca0d5965..175c5a9950be69d7bf6ae9e386af762007a18a51 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_helper.h @@ -52,33 +52,6 @@ const char kGraphOps[] = "ops"; typedef std::unordered_map ShardedVarDevice; const char kShardedVarDevice[] = "sharded_var_device"; - -class SSAGraphBuilder : public ir::Pass { - public: - SSAGraphBuilder() {} - virtual ~SSAGraphBuilder() {} - - DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder); - - protected: - /* - Dependency graph has been constructed. However, there are still data - hazards need to be handled. - */ - static void PolishGraphToSupportDataHazards(ir::Graph *graph); - - static VarHandle *CreateOrGetLatestVarHandle(ir::Graph *graph, ir::Node *node, - const platform::Place &place, - size_t place_offset); - - // Add an output variable (each_var_name, place, place_offset) to op_handle, - // which belongs to graph - static void CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, - ir::Node *new_node, const platform::Place &place, - size_t place_offset); - - static void AddOutputToLeafOps(ir::Graph *graph); -}; } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/ssa_graph_builder.cc b/paddle/fluid/framework/details/ssa_graph_builder.cc deleted file mode 100644 index 575532540a624afde5f6dab25b11e9eac93c6448..0000000000000000000000000000000000000000 --- a/paddle/fluid/framework/details/ssa_graph_builder.cc +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#include "paddle/fluid/framework/details/ssa_graph_builder.h" -#include - -namespace paddle { -namespace framework { -namespace details { -void SSAGraphBuilder::PolishGraphToSupportDataHazards(ir::Graph *graph) { - for (auto &var_map : graph->Get(kGraphVars)) { - for (auto &name_pair : var_map) { - if (name_pair.second.size() <= 1) { - continue; - } - auto it_new = name_pair.second.rbegin(); - auto it_old = name_pair.second.rbegin(); - ++it_old; - for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) { - OpHandleBase *write_op = (*it_new)->GeneratedOp(); - const auto &read_ops = (*it_old)->PendingOps(); - - for (auto *read_op : read_ops) { - // Manually add a dependency var from read_op to write_op; - if (read_op == write_op) { - // Read Write is the same op. - continue; - } - bool has_dep = false; - for (auto *r_out : read_op->Outputs()) { - for (auto *w_in : write_op->Inputs()) { - if (r_out->Node() == w_in->Node()) { - has_dep = true; - break; - } - } - } - if (has_dep) continue; - - auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); - read_op->AddOutput(dep_var); - write_op->AddInput(dep_var); - graph->Get(kGraphDepVars).emplace(dep_var); - } - } - } - } -} - -VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( - ir::Graph *graph, ir::Node *node, const platform::Place &place, - size_t place_offset) { - auto &var_holders = graph->Get(kGraphVars)[place_offset]; - auto &var_holder = var_holders[node->Name()]; - VarHandle *var = nullptr; - if (var_holder.empty()) { - if (node->Var()) { - var = new VarHandle(graph->CreateVarNode(node->Var()), 0, place_offset, - node->Name(), place); - } else { - var = new VarHandle( - graph->CreateEmptyNode(node->Name(), ir::Node::Type::kVariable), 0, - place_offset, node->Name(), place); - } - var_holder.emplace_back(var); - } else { - var = var_holder.rbegin()->get(); - } - return var; -} - -void SSAGraphBuilder::CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, - ir::Node *new_node, - const platform::Place &place, - size_t place_offset) { - auto &vars = - graph->Get(kGraphVars)[place_offset][new_node->Name()]; - size_t version = vars.size(); - auto var = - new VarHandle(new_node, version, place_offset, new_node->Name(), place); - vars.emplace_back(var); - op_handle->AddOutput(var); -} - -void SSAGraphBuilder::AddOutputToLeafOps(ir::Graph *graph) { - for (auto &op : graph->Get(kGraphOps)) { - if (!op->Outputs().empty()) { - continue; - } - auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); - graph->Get(kGraphDepVars).emplace(dummy_leaf); - op->AddOutput(dummy_leaf); - } -} -} // namespace details -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 0eaf9a9c951991a5775604eb8d0e7535f81a4ae2..994bb6492f685138d02971a6caf12572aecd6d6f 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -14,7 +14,7 @@ #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" -#include "paddle/fluid/framework/details/ssa_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b5f01a9a2b76472063658f1a051a2ee3c65559b7..275cb8c592c3c0b153d31149570cd6596b9e1a7f 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -25,9 +25,9 @@ limitations under the License. */ #include "paddle/fluid/platform/nccl_helper.h" #endif +#include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" +#include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" -#include "paddle/fluid/framework/details/ssa_graph_checker.h" -#include "paddle/fluid/framework/details/ssa_graph_printer.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/platform/profiler.h" @@ -57,39 +57,39 @@ std::unique_ptr ApplyParallelExecutorPass( } // Convert graph to run on multi-devices. - auto multi_device_pass = - ir::PassRegistry::Instance().Get("multi_device_pass"); - multi_device_pass->SetNotOwned>("places", - &places); - multi_device_pass->SetNotOwned("loss_var_name", - &loss_var_name); - multi_device_pass->SetNotOwned>( + auto multi_devices_pass = + ir::PassRegistry::Instance().Get("multi_devices_pass"); + multi_devices_pass->SetNotOwned>("places", + &places); + multi_devices_pass->SetNotOwned("loss_var_name", + &loss_var_name); + multi_devices_pass->SetNotOwned>( "params", ¶m_names); - multi_device_pass->SetNotOwned>("local_scopes", - &local_scopes); - multi_device_pass->SetNotOwned("strategy", &strategy); + multi_devices_pass->SetNotOwned>("local_scopes", + &local_scopes); + multi_devices_pass->SetNotOwned("strategy", &strategy); #ifdef PADDLE_WITH_CUDA platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; - multi_device_pass->SetNotOwned("nccl_ctxs", nctx); + multi_devices_pass->SetNotOwned("nccl_ctxs", nctx); #endif - graph = multi_device_pass->Apply(std::move(graph)); + graph = multi_devices_pass->Apply(std::move(graph)); // Apply a graph print pass to record a graph with device info. if (!strategy.debug_graphviz_path_.empty()) { - auto multi_device_print_pass = - ir::PassRegistry::Instance().Get("multi_device_print_pass"); - multi_device_print_pass->SetNotOwned( + auto multi_devices_print_pass = + ir::PassRegistry::Instance().Get("multi_devices_print_pass"); + multi_devices_print_pass->SetNotOwned( "debug_graphviz_path", &strategy.debug_graphviz_path_); - multi_device_print_pass->Set( + multi_devices_print_pass->Set( "graph_printer", new details::GraphvizSSAGraphPrinter); - graph = multi_device_print_pass->Apply(std::move(graph)); + graph = multi_devices_print_pass->Apply(std::move(graph)); } // Verify that the graph is correct for multi-device executor. - auto multi_device_check_pass = - ir::PassRegistry::Instance().Get("multi_device_check_pass"); - graph = multi_device_check_pass->Apply(std::move(graph)); + auto multi_devices_check_pass = + ir::PassRegistry::Instance().Get("multi_devices_check_pass"); + graph = multi_devices_check_pass->Apply(std::move(graph)); return graph; } @@ -354,6 +354,6 @@ ParallelExecutor::~ParallelExecutor() { } // namespace paddle USE_PASS(graph_viz_pass); -USE_PASS(multi_device_pass); -USE_PASS(multi_device_check_pass); -USE_PASS(multi_device_print_pass); +USE_PASS(multi_devices_pass); +USE_PASS(multi_devices_check_pass); +USE_PASS(multi_devices_print_pass); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index d624956acde86cefc4ec1dec80df3738bcf1d8be..5fb748fa205d5e9dbd2943b615c69aedd0e7a26f 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -19,7 +19,7 @@ limitations under the License. */ #include #include #include "paddle/fluid/framework/details/execution_strategy.h" -#include "paddle/fluid/framework/details/multi_devices_graph_builder.h" +#include "paddle/fluid/framework/details/multi_devices_graph_pass.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 4c3b8ec78190723598a56f7633764f10dd5047f3..ff0e989464e76b0f7cb163bd95997b82303036d6 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -235,7 +235,12 @@ else() endif() op_library(cross_entropy_op DEPS cross_entropy) -op_library(softmax_with_cross_entropy_op DEPS cross_entropy softmax) +if(WITH_GPU) + op_library(softmax_with_cross_entropy_op DEPS cross_entropy softmax cub) +else() + op_library(softmax_with_cross_entropy_op DEPS cross_entropy softmax) +endif() + op_library(softmax_op DEPS softmax) op_library(sequence_softmax_op DEPS softmax) if (WITH_GPU AND TENSORRT_FOUND) @@ -273,9 +278,9 @@ op_library(squeeze_op DEPS reshape_op) op_library(extract_rows_op DEPS memory) op_library(flatten_op DEPS reshape_op) - if (WITH_GPU) op_library(conv_op DEPS vol2col depthwise_conv im2col) + op_library(layer_norm_op DEPS cub) else() op_library(conv_op DEPS vol2col im2col) endif() diff --git a/paddle/fluid/operators/crop_op.cc b/paddle/fluid/operators/crop_op.cc index 5b5a220cf90e7813f914ae35733e7a4103391b2d..a2a871efa850df5101be7c27ebd81456acace7e1 100644 --- a/paddle/fluid/operators/crop_op.cc +++ b/paddle/fluid/operators/crop_op.cc @@ -1,4 +1,4 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -188,6 +188,7 @@ namespace ops = paddle::operators; REGISTER_OPERATOR(crop, ops::CropOp, ops::CropOpMaker, paddle::framework::DefaultGradOpDescMaker); REGISTER_OPERATOR(crop_grad, ops::CropOpGrad); -REGISTER_OP_CPU_KERNEL(crop, ops::CropKernel); +REGISTER_OP_CPU_KERNEL( + crop, ops::CropKernel); REGISTER_OP_CPU_KERNEL( crop_grad, ops::CropGradKernel); diff --git a/paddle/fluid/operators/crop_op.cu b/paddle/fluid/operators/crop_op.cu index 1a391860463dba14ad0de755ceb659bc9f64adc9..b75678217e36aa2297c68a7f8e2a9dfafadaca72 100644 --- a/paddle/fluid/operators/crop_op.cu +++ b/paddle/fluid/operators/crop_op.cu @@ -1,4 +1,4 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ limitations under the License. */ #include "paddle/fluid/operators/crop_op.h" namespace ops = paddle::operators; -REGISTER_OP_CUDA_KERNEL(crop, ops::CropKernel); +REGISTER_OP_CUDA_KERNEL( + crop, ops::CropKernel); REGISTER_OP_CUDA_KERNEL( crop_grad, ops::CropGradKernel); diff --git a/paddle/fluid/operators/crop_op.h b/paddle/fluid/operators/crop_op.h index 772e80bbea4f2db654cefd0dcb404bc33803bd7a..2d7d33bd4f9b42b644444912570375bad92ba6c2 100644 --- a/paddle/fluid/operators/crop_op.h +++ b/paddle/fluid/operators/crop_op.h @@ -1,4 +1,4 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -58,32 +58,74 @@ static std::vector GetOffsets(const framework::ExecutionContext& ctx) { return res; } -template +template +void CropFunction(const framework::ExecutionContext& context) { + auto* x = context.Input("X"); + auto* out = context.Output("Out"); + auto out_dims = out->dims(); + if (out_dims[0] == -1) { + out_dims[0] = x->dims()[0]; + } + out->mutable_data(out_dims, context.GetPlace()); + auto x_stride = framework::stride(x->dims()); + auto out_stride = framework::stride(out->dims()); + auto offsets = GetOffsets(context); + int64_t offset = 0; + for (size_t i = 0; i < offsets.size(); ++i) { + offset += (x_stride[i] * offsets[i]); + } + + auto x_tensor = EigenTensor::From(*x); + auto out_tensor = EigenTensor::From(*out); + Eigen::array e_offsets; + Eigen::array e_shape; + for (size_t i = 0; i < D; ++i) { + e_offsets[i] = offsets[i]; + e_shape[i] = out->dims()[i]; + } + auto& place = + *context.template device_context().eigen_device(); + out_tensor.device(place) = x_tensor.slice(e_offsets, e_shape); +} + +template class CropKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { - auto* x = context.Input("X"); - auto* out = context.Output("Out"); - const T* x_data = x->data(); - T* out_data = out->mutable_data(context.GetPlace()); - auto x_stride = framework::stride(x->dims()); - auto out_stride = framework::stride(out->dims()); - auto offsets = GetOffsets(context); - int64_t offset = 0; - for (size_t i = 0; i < offsets.size(); ++i) { - offset += (x_stride[i] * offsets[i]); + int rank = context.Input("X")->dims().size(); + switch (rank) { + case 1: + CropFunction(context); + break; + case 2: + CropFunction(context); + break; + case 3: + CropFunction(context); + break; + case 4: + CropFunction(context); + break; + case 5: + CropFunction(context); + break; + case 6: + CropFunction(context); + break; + default: + PADDLE_THROW( + "CropOp only support tensors with no more than 6 dimensions."); } - StridedMemcpy(context.device_context(), x_data + offset, x_stride, - out->dims(), out_stride, out_data); } }; template void CropGradFunction(const framework::ExecutionContext& context) { auto* d_x = context.Output(framework::GradVarName("X")); + auto* x = context.Input("X"); if (d_x != nullptr) { auto* d_out = context.Input(framework::GradVarName("Out")); - d_x->mutable_data(context.GetPlace()); + d_x->mutable_data(x->dims(), context.GetPlace()); auto offsets = GetOffsets(context); Eigen::array, D> paddings; for (size_t i = 0; i < D; ++i) { diff --git a/paddle/fluid/operators/detection/mine_hard_examples_op.cc b/paddle/fluid/operators/detection/mine_hard_examples_op.cc index d4a09bae3a98e4518f9885c1e9182f7033a0d262..54a4b87ec8f13c4d474aad4cc0b8159cd5f59d1c 100644 --- a/paddle/fluid/operators/detection/mine_hard_examples_op.cc +++ b/paddle/fluid/operators/detection/mine_hard_examples_op.cc @@ -227,6 +227,9 @@ class MineHardExamplesOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_GT( neg_pos_ratio, 0.0f, "neg_pos_ratio must greater than zero in max_negative mode"); + PADDLE_ENFORCE_LT( + neg_dist_threshold, 1.0f, + "neg_dist_threshold must less than one in max_negative mode"); PADDLE_ENFORCE_GT( neg_dist_threshold, 0.0f, "neg_dist_threshold must greater than zero in max_negative mode"); diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index 55995783c6eab10632ab2a5bca64ca856f000df1..de1a503154deb967eb4389a9f43b86c05626d966 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -41,6 +41,7 @@ bool RequestSendHandler::Handle(const std::string& varname, // Async if (!sync_mode_) { + rpc_server_->Profiler().OneStep(); try { executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), scope); diff --git a/paddle/fluid/operators/distributed/rpc_server.cc b/paddle/fluid/operators/distributed/rpc_server.cc index 83b14fa64d735d80f43bf55c798cddb2f3ea7032..406e7294c190172347d432fb155c2a81c43dda25 100644 --- a/paddle/fluid/operators/distributed/rpc_server.cc +++ b/paddle/fluid/operators/distributed/rpc_server.cc @@ -18,11 +18,44 @@ #include #include "paddle/fluid/operators/distributed/rpc_server.h" +#include "paddle/fluid/platform/profiler.h" + +DEFINE_int32(rpc_server_profile_period, 0, + "the period of listen_and_serv to do profile"); +DEFINE_string(rpc_server_profile_path, "/dev/null", + "the profile log file path"); namespace paddle { namespace operators { namespace distributed { +RPCServerProfiler::RPCServerProfiler(int profile_period, + const std::string& profile_log_path) + : profile_period_(profile_period), profile_log_path_(profile_log_path) { + step_ = 0; +} + +void RPCServerProfiler::OneStep() { + PADDLE_ENFORCE_LE(step_, profile_period_, + "step_ should not be larger then " + "profile_period_"); + if (profile_period_ <= 0) { + return; + } + + if (step_ == 0) { + auto pf_state = paddle::platform::ProfilerState::kCPU; + paddle::platform::EnableProfiler(pf_state); + } + if (step_ == profile_period_) { + paddle::platform::DisableProfiler(paddle::platform::EventSortingKey::kTotal, + profile_log_path_); + step_ = 0; + } else { + step_++; + } +} + void RPCServer::ShutDown() { LOG(INFO) << "RPCServer ShutDown "; ShutDownImpl(); diff --git a/paddle/fluid/operators/distributed/rpc_server.h b/paddle/fluid/operators/distributed/rpc_server.h index fd914d7a72e61bc9472876c433b65598ef5b1980..d813ba03e2fbec6e808f59f814a9b2f4bfbcd77b 100644 --- a/paddle/fluid/operators/distributed/rpc_server.h +++ b/paddle/fluid/operators/distributed/rpc_server.h @@ -19,16 +19,33 @@ #include // NOLINT #include #include + #include "paddle/fluid/operators/distributed/request_handler.h" +DECLARE_int32(rpc_server_profile_period); +DECLARE_string(rpc_server_profile_path); + namespace paddle { namespace operators { namespace distributed { +class RPCServerProfiler { + public: + RPCServerProfiler(int profile_period, const std::string& profile_log_path); + void OneStep(); + + private: + const int profile_period_; + std::string profile_log_path_; + int step_; +}; + class RPCServer { public: explicit RPCServer(const std::string& address, int client_num) : cur_cond_(0), + profiler_(FLAGS_rpc_server_profile_period, + FLAGS_rpc_server_profile_path), bind_address_(address), exit_flag_(false), selected_port_(0), @@ -67,6 +84,7 @@ class RPCServer { void Complete(); void ResetBarrierCounter(); + RPCServerProfiler& Profiler() { return profiler_; } protected: virtual void ShutDownImpl() = 0; @@ -79,6 +97,7 @@ class RPCServer { std::unordered_map rpc_cond_map_; std::atomic cur_cond_; std::condition_variable rpc_cond_; + RPCServerProfiler profiler_; protected: std::string bind_address_; diff --git a/paddle/fluid/operators/elementwise_add_op.cu b/paddle/fluid/operators/elementwise_add_op.cu index dfff518f170b56d180b6883c363effb8dbd677b6..6cbf6066c92b02eb75922587f9da5192bed15580 100644 --- a/paddle/fluid/operators/elementwise_add_op.cu +++ b/paddle/fluid/operators/elementwise_add_op.cu @@ -16,6 +16,60 @@ limitations under the License. */ #include "paddle/fluid/operators/elementwise_add_op.h" #include "paddle/fluid/platform/float16.h" +namespace paddle { +namespace operators { + +template +__global__ void ElementwiseAddCUDAKernel(const T *x, const T *y, T *z, int n, + int post, int size) { + int idx_x = threadIdx.x + blockIdx.x * blockDim.x; + if (idx_x < size) { + int idx_y = idx_x / post - (idx_x / (n * post)) * n; + z[idx_x] = x[idx_x] + y[idx_y]; + } +} + +template +class ElementwiseAddKernel + : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + using Tensor = framework::Tensor; + + const auto x = ctx.Input("X"); + const auto y = ctx.Input("Y"); + auto z = ctx.Output("Out"); + auto *z_data = z->mutable_data(ctx.GetPlace()); + + auto &device = *(ctx.cuda_device_context().eigen_device()); + const framework::DDim &x_dim = x->dims(); + framework::DDim y_dim = y->dims(); + int size = x->numel(); + if (x_dim == y_dim) { + auto dim = framework::make_ddim({size}); + auto z_eigen = framework::EigenTensor::From(*z, dim); + auto x_eigen = framework::EigenTensor::From(*x, dim); + auto y_eigen = framework::EigenTensor::From(*y, dim); + z_eigen.device(device) = x_eigen + y_eigen; + } else { + int axis = ctx.Attr("axis"); + axis = (axis == -1 ? x_dim.size() - y_dim.size() : axis); + y_dim = trim_trailing_singular_dims(y_dim); + axis = (y_dim.size() == 0) ? x_dim.size() : axis; + int pre, n, post; + get_mid_dims(x_dim, y_dim, axis, &pre, &n, &post); + int threads = 512; + int grids = (size + threads - 1) / threads; + auto stream = ctx.cuda_device_context().stream(); + ElementwiseAddCUDAKernel<<>>( + x->data(), y->data(), z_data, n, post, size); + } + } +}; + +} // namespace operators +} // namespace paddle + namespace ops = paddle::operators; namespace plat = paddle::platform; diff --git a/paddle/fluid/operators/elementwise_add_op.h b/paddle/fluid/operators/elementwise_add_op.h index 5356105e2e551c0528694091608fc7585dce66d2..0b19723720171a857c946880c246e2247a0023a7 100644 --- a/paddle/fluid/operators/elementwise_add_op.h +++ b/paddle/fluid/operators/elementwise_add_op.h @@ -144,16 +144,41 @@ class ElementwiseAddGradKernel : public framework::OpKernel { auto* dout = ctx.Input(framework::GradVarName("Out")); auto* dx = ctx.Output(framework::GradVarName("X")); auto* dy = ctx.Output(framework::GradVarName("Y")); - // skip out, x, y - auto* out = dout; - auto *x = dout, *y = dout; - if (platform::is_cpu_place(ctx.GetPlace()) && dx != nullptr && - dy != nullptr && (dx->dims() == dy->dims())) { - elementwise_add_grad(ctx, x, y, out, dout, dx, dy); + if (dx != nullptr) { + // In fact, we can just share memory, but it may cause a bug of memory + // optimizer + // dx->ShareDataWith(*dout); + framework::TensorCopy(*dout, ctx.GetPlace(), + ctx.template device_context(), dx); + } + + if (dy == nullptr) return; + + const framework::DDim& x_dim = dout->dims(); + framework::DDim y_dim = dy->dims(); + if (x_dim == y_dim) { + // dy->ShareDataWith(*dout); + framework::TensorCopy(*dout, ctx.GetPlace(), + ctx.template device_context(), dy); } else { - default_elementwise_add_grad(ctx, x, y, out, dout, dx, - dy); + dy->mutable_data(ctx.GetPlace()); + // Perform reduction to dout to calculate dy + int axis = ctx.Attr("axis"); + axis = (axis == -1 ? x_dim.size() - y_dim.size() : axis); + y_dim = trim_trailing_singular_dims(y_dim); + axis = (y_dim.size() == 0) ? x_dim.size() : axis; + + auto& device = + *(ctx.template device_context().eigen_device()); + int pre, n, post; + get_mid_dims(x_dim, y_dim, axis, &pre, &n, &post); + auto eigen_dout = framework::EigenTensor::From( + *dout, framework::make_ddim({pre, n, post})); + auto eigen_dy = + framework::EigenTensor::From(*dy, framework::make_ddim({n})); + eigen_dy.device(device) = eigen_dout.sum( + framework::EigenDim<2>::From(framework::make_ddim({0, 2}))); } } }; diff --git a/paddle/fluid/operators/layer_norm_op.cu b/paddle/fluid/operators/layer_norm_op.cu index 6840e1e08f3d5bc84a05f15e30982c7cfb59680b..0886c41a1b582881faf24f5531d414db4e4db71c 100644 --- a/paddle/fluid/operators/layer_norm_op.cu +++ b/paddle/fluid/operators/layer_norm_op.cu @@ -1,4 +1,4 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -12,8 +12,512 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include "paddle/fluid/operators/layer_norm_op.h" +namespace paddle { +namespace operators { + +inline static int GetDesiredBlockDim(int block_dim) { + const int kMaxBlockDim = 512; + return block_dim >= kMaxBlockDim + ? kMaxBlockDim + : (1 << (static_cast(std::log2f(block_dim)))); +} + +#define FIXED_BLOCK_DIM_CASE_BASE(log2_block_dim, ...) \ + case (1 << (log2_block_dim)): { \ + constexpr auto kBlockDim = (1 << (log2_block_dim)); \ + __VA_ARGS__; \ + } break + +#define FIXED_BLOCK_DIM_CASE(...) \ + FIXED_BLOCK_DIM_CASE_BASE(9, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(8, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(7, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(6, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(5, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(4, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(3, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(2, ##__VA_ARGS__); \ + FIXED_BLOCK_DIM_CASE_BASE(1, ##__VA_ARGS__) + +static __device__ __forceinline__ float real_sqrt(float x) { return sqrtf(x); } +static __device__ __forceinline__ double real_sqrt(double x) { return sqrt(x); } + +template +struct PairForLayerNorm { + __device__ __forceinline__ PairForLayerNorm() {} + __device__ __forceinline__ PairForLayerNorm(const T &first, const T &second) + : first_(first), second_(second) {} + + T first_; + T second_; +}; + +template +struct PairForLayerNormAddFunctor { + __device__ __forceinline__ PairForLayerNorm operator()( + const PairForLayerNorm &p1, const PairForLayerNorm &p2) { + return PairForLayerNorm(p1.first_ + p2.first_, p1.second_ + p2.second_); + } +}; + +template +__global__ void LayerNormForward(const T *x, const T *scale, const T *bias, + T *y, T *mean, T *var, float epsilon, + int feature_size) { + using BlockReduce = cub::BlockReduce, BlockDim>; + __shared__ typename BlockReduce::TempStorage temp_storage; + + int beg_idx = blockIdx.x * feature_size + threadIdx.x; + int end_idx = (blockIdx.x + 1) * feature_size; + + // Step 1: Reduce to calculate mean and var + T mean_val = static_cast(0); + T var_val = static_cast(0); + for (int i = beg_idx; i < end_idx; i += BlockDim) { + T tmp = x[i]; + mean_val += tmp; + var_val += (tmp * tmp); + } + auto pair = BlockReduce(temp_storage) + .Reduce(PairForLayerNorm(mean_val, var_val), + PairForLayerNormAddFunctor()); + if (threadIdx.x == 0) { + auto tmp = pair.first_ / feature_size; + mean[blockIdx.x] = tmp; + var[blockIdx.x] = pair.second_ / feature_size - tmp * tmp; + } + __syncthreads(); + mean_val = mean[blockIdx.x]; + var_val = static_cast(real_sqrt(var[blockIdx.x] + epsilon)); + + // Step 2: Calculate y + if (scale != nullptr) { + if (bias != nullptr) { + for (int i = beg_idx, j = threadIdx.x; i < end_idx; + i += BlockDim, j += BlockDim) { + y[i] = scale[j] * (x[i] - mean_val) / var_val + bias[j]; + } + } else { + for (int i = beg_idx, j = threadIdx.x; i < end_idx; + i += BlockDim, j += BlockDim) { + y[i] = scale[j] * (x[i] - mean_val) / var_val; + } + } + } else { // scale == nullptr + if (bias != nullptr) { + for (int i = beg_idx, j = threadIdx.x; i < end_idx; + i += BlockDim, j += BlockDim) { + y[i] = (x[i] - mean_val) / var_val + bias[j]; + } + } else { + for (int i = beg_idx, j = threadIdx.x; i < end_idx; + i += BlockDim, j += BlockDim) { + y[i] = (x[i] - mean_val) / var_val; + } + } + } +} + +// Make sure that d_scale != nullptr && d_bias != nullptr +// Since d_scale != nullptr, scale would not be nullptr +template +__global__ void LayerNormBackwardGradientAll(const T *x, const T *d_y, + T *d_scale, T *d_bias, T *d_x, + const T *mean, const T *var, + const T *scale, float epsilon, + int batch_size, int feature_size) { + using BlockReduce = cub::BlockReduce, BlockDim>; + __shared__ typename BlockReduce::TempStorage temp_storage; + + int beg_idx = threadIdx.x * feature_size + blockIdx.x; + int end_idx = batch_size * feature_size + blockIdx.x; + int stride = BlockDim * feature_size; + + T d_scale_partial = 0, d_bias_partial = 0; + + for (int i = beg_idx; i < end_idx; i += stride) { + int row_idx = i / feature_size; + auto var_val = static_cast(real_sqrt(var[row_idx] + epsilon)); + d_scale_partial += d_y[i] * (x[i] - mean[row_idx]) / var_val; + d_bias_partial += d_y[i]; + if (HasDx) { + d_x[i] = d_y[i] * scale[blockIdx.x] / var_val; + } + } + + auto pair = BlockReduce(temp_storage) + .Reduce(PairForLayerNorm(d_scale_partial, d_bias_partial), + PairForLayerNormAddFunctor()); + + if (threadIdx.x == 0) { + d_scale[blockIdx.x] = pair.first_; + d_bias[blockIdx.x] = pair.second_; + } +} + +// Make sure that there is only one true expression: d_scale != nullptr +// or d_bias != nullptr +// Notice: scale may be nullptr +template +__global__ void LayerNormBackwardGradientScaleOrBias( + const T *x, const T *d_y, T *d_scale, T *d_bias, T *d_x, const T *mean, + const T *var, const T *scale, float epsilon, int batch_size, + int feature_size) { + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + int beg_idx = threadIdx.x * feature_size + blockIdx.x; + int end_idx = batch_size * feature_size + blockIdx.x; + int stride = BlockDim * feature_size; + T d_scale_or_d_bias_partial = 0; + + for (int i = beg_idx; i < end_idx; i += stride) { + int row_idx = i / feature_size; + auto var_val = static_cast(real_sqrt(var[row_idx] + epsilon)); + if (HasDScale) { + d_scale_or_d_bias_partial += d_y[i] * (x[i] - mean[row_idx]) / var_val; + } else { // d_bias != nullptr + d_scale_or_d_bias_partial += d_y[i]; + } + + if (HasDx) { + if (scale != nullptr) { + d_x[i] = d_y[i] * scale[blockIdx.x] / var_val; + } else { + d_x[i] = d_y[i] / var_val; + } + } + } + + d_scale_or_d_bias_partial = + BlockReduce(temp_storage).Reduce(d_scale_or_d_bias_partial, cub::Sum()); + + if (threadIdx.x == 0) { + if (HasDScale) { + d_scale[blockIdx.x] = d_scale_or_d_bias_partial; + } else { + d_bias[blockIdx.x] = d_scale_or_d_bias_partial; + } + } +} + +template +__global__ void LayerNormBackwardPostProcessToCalculateDX(const T *x, T *d_x, + const T *mean, + const T *var, + float epsilon, + int feature_size) { + using BlockReduce = cub::BlockReduce, BlockDim>; + __shared__ typename BlockReduce::TempStorage temp_storage; + __shared__ T d_x_reduce_tmp[2]; + + int beg_idx = blockIdx.x * feature_size + threadIdx.x; + int end_idx = (blockIdx.x + 1) * feature_size; + + T block_mean = mean[blockIdx.x]; + T block_var = var[blockIdx.x]; + T d_x_mean_partial = 0, d_x_var_partial = 0; + for (int i = beg_idx; i < end_idx; i += BlockDim) { + d_x_mean_partial += d_x[i]; + d_x_var_partial += d_x[i] * (x[i] - block_mean); + } + + auto pair = + BlockReduce(temp_storage) + .Reduce(PairForLayerNorm(d_x_mean_partial, d_x_var_partial), + PairForLayerNormAddFunctor()); + + if (threadIdx.x == 0) { + d_x_reduce_tmp[0] = pair.first_ / feature_size; + d_x_reduce_tmp[1] = pair.second_ / (feature_size * (block_var + epsilon)); + } + __syncthreads(); + + d_x_mean_partial = d_x_reduce_tmp[0]; + d_x_var_partial = d_x_reduce_tmp[1]; + for (int i = beg_idx; i < end_idx; i += BlockDim) { + d_x[i] -= d_x_mean_partial; + d_x[i] -= (x[i] - block_mean) * d_x_var_partial; + } +} + +// Here, we only calculate d_x +template +__global__ void LayerNormBackwardGradientOnlyDX(const T *x, const T *d_y, + T *d_x, const T *mean, + const T *var, const T *scale, + float epsilon, + int feature_size) { + using BlockReduce = cub::BlockReduce, BlockDim>; + __shared__ typename BlockReduce::TempStorage temp_storage; + __shared__ T d_x_reduce_tmp[2]; + + int beg_idx = blockIdx.x * feature_size + threadIdx.x; + int end_idx = (blockIdx.x + 1) * feature_size; + + T block_mean = mean[blockIdx.x], block_var = var[blockIdx.x]; + T d_x_mean_partial = 0, d_x_var_partial = 0; + for (int i = beg_idx; i < end_idx; i += BlockDim) { + auto var_val = static_cast(real_sqrt(block_var + epsilon)); + if (scale != nullptr) { + int col_idx = i % feature_size; + d_x[i] = d_y[i] * scale[col_idx] / var_val; + } else { + d_x[i] = d_y[i] / var_val; + } + d_x_mean_partial += d_x[i]; + d_x_var_partial += d_x[i] * (x[i] - block_mean); + } + + auto pair = + BlockReduce(temp_storage) + .Reduce(PairForLayerNorm(d_x_mean_partial, d_x_var_partial), + PairForLayerNormAddFunctor()); + + if (threadIdx.x == 0) { + d_x_reduce_tmp[0] = pair.first_ / feature_size; + d_x_reduce_tmp[1] = pair.second_ / (feature_size * (block_var + epsilon)); + } + __syncthreads(); + + d_x_mean_partial = d_x_reduce_tmp[0]; + d_x_var_partial = d_x_reduce_tmp[1]; + for (int i = beg_idx; i < end_idx; i += BlockDim) { + d_x[i] -= d_x_mean_partial; + d_x[i] -= (x[i] - block_mean) * d_x_var_partial; + } +} + +template +__global__ void LayerNormBackwardWhenBatchSizeIsOne( + const T *x, const T *d_y, T *d_x, T *d_scale, T *d_bias, const T *mean, + const T *var, const T *scale, float epsilon, int feature_size) { + int idx = threadIdx.x + blockIdx.x * blockDim.x; + if (idx < feature_size) { + auto var_val = static_cast(real_sqrt(var[idx] + epsilon)); + if (d_x != nullptr) { + if (d_scale == nullptr) { + d_x[idx] = d_y[idx] / var_val; + } else { + d_x[idx] = d_y[idx] * scale[idx] / var_val; + } + } + + if (d_scale != nullptr) { + d_scale[idx] = d_y[idx] * (x[idx] - mean[idx]) / var_val; + } + + if (d_bias != nullptr) d_bias[idx] = d_y[idx]; + } +} + +template +static void LayerNormBackward(const T *x, const T *d_y, const T *scale, + const T *mean, const T *var, T *d_x, T *d_scale, + T *d_bias, float epsilon, int batch_size, + int feature_size, cudaStream_t stream) { + const int kMaxBlockDim = 512; + int gradient_flag = ((d_x != nullptr ? 1 : 0) << 2) | + ((d_scale != nullptr ? 1 : 0) << 1) | + ((d_bias != nullptr ? 1 : 0)); + if (gradient_flag == 0) return; + + if (batch_size == 1) { + LayerNormBackwardWhenBatchSizeIsOne< + T><<<(feature_size + kMaxBlockDim - 1) / kMaxBlockDim, kMaxBlockDim, 0, + stream>>>(x, d_y, d_x, d_scale, d_bias, mean, var, scale, epsilon, + feature_size); + + if (d_x != nullptr) { + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE(LayerNormBackwardPostProcessToCalculateDX< + T, kBlockDim><<<1, kBlockDim, 0, stream>>>( + x, d_x, mean, var, epsilon, feature_size)); + } + } + return; + } + + auto block_dim = GetDesiredBlockDim(batch_size); + switch (gradient_flag) { + case 1: // d_x == nulptr, d_scale == nullptr, d_bias != nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE(LayerNormBackwardGradientScaleOrBias< + T, kBlockDim, false, + false><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, batch_size, + feature_size)); + } + break; + case 2: // d_x == nullptr, d_scale != nullptr, d_bias == nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE(LayerNormBackwardGradientScaleOrBias< + T, kBlockDim, false, + true><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, batch_size, + feature_size)); + } + break; + case 3: // d_x == nullptr, d_scale != nulptr, d_bias != nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardGradientAll< + T, kBlockDim, false><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, + batch_size, feature_size)); + } + break; + case 4: // d_x != nullptr, d_scale == nullptr, d_bias == nullptr + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardGradientOnlyDX< + T, kBlockDim><<>>( + x, d_y, d_x, mean, var, scale, epsilon, feature_size)); + } + break; + case 5: // d_x != nulptr, d_scale == nullptr, d_bias != nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE(LayerNormBackwardGradientScaleOrBias< + T, kBlockDim, true, + false><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, batch_size, + feature_size)); + } + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardPostProcessToCalculateDX< + T, kBlockDim><<>>( + x, d_x, mean, var, epsilon, feature_size)); + } + break; + case 6: // d_x != nullptr, d_scale != nullptr, d_bias == nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE(LayerNormBackwardGradientScaleOrBias< + T, kBlockDim, true, + true><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, batch_size, + feature_size)); + } + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardPostProcessToCalculateDX< + T, kBlockDim><<>>( + x, d_x, mean, var, epsilon, feature_size)); + } + break; + case 7: // d_x != nullptr, d_scale != nullptr, d_bias != nullptr + switch (block_dim) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardGradientAll< + T, kBlockDim, true><<>>( + x, d_y, d_scale, d_bias, d_x, mean, var, scale, epsilon, + batch_size, feature_size)); + } + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE( + LayerNormBackwardPostProcessToCalculateDX< + T, kBlockDim><<>>( + x, d_x, mean, var, epsilon, feature_size)); + } + break; + default: + break; + } +} + +template +class LayerNormKernel + : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + const float epsilon = ctx.Attr("epsilon"); + auto *scale = ctx.Input("Scale"); + auto *bias = ctx.Input("Bias"); + auto *x = ctx.Input("X"); + + auto *y = ctx.Output("Y"); + auto *mean = ctx.Output("Mean"); + auto *var = ctx.Output("Variance"); + const auto begin_norm_axis = ctx.Attr("begin_norm_axis"); + + const auto x_dims = x->dims(); + auto *x_data = x->data(); + auto *y_data = y->mutable_data(ctx.GetPlace()); + auto *mean_data = mean->mutable_data(ctx.GetPlace()); + auto *var_data = var->mutable_data(ctx.GetPlace()); + auto *scale_data = (scale == nullptr ? nullptr : scale->data()); + auto *bias_data = (bias == nullptr ? nullptr : bias->data()); + + auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis); + int batch_size = static_cast(matrix_dim[0]); + int feature_size = static_cast(matrix_dim[1]); + + auto stream = ctx.cuda_device_context().stream(); + + switch (GetDesiredBlockDim(feature_size)) { + FIXED_BLOCK_DIM_CASE( + LayerNormForward<<>>( + x_data, scale_data, bias_data, y_data, mean_data, var_data, + epsilon, feature_size)); + default: + PADDLE_THROW( + "Product from begin_norm_axis to end must be larger than 1"); + break; + } + } +}; + +template +class LayerNormGradKernel + : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + const float epsilon = ctx.Attr("epsilon"); + // d_x, d_scale, d_bias may be nullptr + auto *d_x = ctx.Output(framework::GradVarName("X")); + auto *d_scale = ctx.Output(framework::GradVarName("Scale")); + auto *d_bias = ctx.Output(framework::GradVarName("Bias")); + + auto *x = ctx.Input("X"); + auto *mean = ctx.Input("Mean"); + auto *var = ctx.Input("Variance"); + auto *scale = ctx.Input("Scale"); + auto *d_y = ctx.Input(framework::GradVarName("Y")); + + auto *x_data = x->data(); + auto *d_y_data = d_y->data(); + auto *mean_data = mean->data(); + auto *var_data = var->data(); + auto *scale_data = (scale == nullptr ? nullptr : scale->data()); + auto *d_scale_data = + (d_scale == nullptr ? nullptr + : d_scale->mutable_data(ctx.GetPlace())); + auto *d_bias_data = + (d_bias == nullptr ? nullptr : d_bias->mutable_data(ctx.GetPlace())); + auto *d_x_data = + (d_x == nullptr ? nullptr : d_x->mutable_data(ctx.GetPlace())); + + const auto &x_dims = x->dims(); + const auto begin_norm_axis = ctx.Attr("begin_norm_axis"); + auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis); + int batch_size = static_cast(matrix_dim[0]); + int feature_size = static_cast(matrix_dim[1]); + + auto stream = ctx.cuda_device_context().stream(); + + LayerNormBackward(x_data, d_y_data, scale_data, mean_data, var_data, + d_x_data, d_scale_data, d_bias_data, epsilon, + batch_size, feature_size, stream); + } +}; + +#undef FIXED_BLOCK_DIM_CASE_BASE +#undef FIXED_BLOCK_DIM_CASE +} // namespace operators +} // namespace paddle + namespace ops = paddle::operators; REGISTER_OP_CUDA_KERNEL( layer_norm, diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index e14b148cc00f425e90b0b2256ab3462753a34f47..b1948076969e7a651179b97b107b85beea175280 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -25,10 +25,6 @@ limitations under the License. */ #include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/listen_and_serv_op.h" -#include "paddle/fluid/platform/profiler.h" - -DEFINE_int32(listen_and_serv_profile_period, 0, - "the period of listen_and_serv to do profile"); namespace paddle { namespace operators { @@ -108,6 +104,7 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list, const int checkpoint_point_block_id) const { + VLOG(2) << "RunSyncLoop"; size_t num_blocks = program->Size(); auto optimize_blocks = Attr>(kOptimizeBlocks); @@ -128,17 +125,8 @@ void ListenAndServOp::RunSyncLoop( rpc_service_->ResetBarrierCounter(); - int32_t profile_step = 0; while (true) { - PADDLE_ENFORCE_LE(profile_step, FLAGS_listen_and_serv_profile_period, - "profile_step should not be larger then " - "FLAGS_listen_and_serv_profile_period"); - if (FLAGS_listen_and_serv_profile_period > 0) { - if (profile_step == 0) { - auto pf_state = paddle::platform::ProfilerState::kCPU; - paddle::platform::EnableProfiler(pf_state); - } - } + rpc_service_->Profiler().OneStep(); // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. rpc_service_->SetCond(distributed::kRequestSend); @@ -180,21 +168,13 @@ void ListenAndServOp::RunSyncLoop( // reset received sparse vars to avoid reuse it in the next mini-batch dynamic_cast(request_send_handler_.get()) ->ResetSparseVarRecorder(); - if (FLAGS_listen_and_serv_profile_period > 0) { - if (profile_step == FLAGS_listen_and_serv_profile_period) { - paddle::platform::DisableProfiler( - paddle::platform::EventSortingKey::kTotal, "/dev/null"); - profile_step = 0; - } else { - profile_step++; - } - } } // while(true) } void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, framework::ProgramDesc *program, framework::Scope *recv_scope) const { + VLOG(2) << "RunAsyncLoop"; // grad name to block id std::unordered_map grad_to_block_id; std::unordered_map id_to_grad; diff --git a/paddle/fluid/platform/cpu_info.cc b/paddle/fluid/platform/cpu_info.cc index 9280965af29d0f5635c015846ed65746ee3dc669..7d53a684d6068c79659719159696ef5aebfeaa2b 100644 --- a/paddle/fluid/platform/cpu_info.cc +++ b/paddle/fluid/platform/cpu_info.cc @@ -13,8 +13,11 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/platform/cpu_info.h" + +#ifdef PADDLE_WITH_XBYAK #include "xbyak/xbyak.h" #include "xbyak/xbyak_util.h" +#endif #ifdef __APPLE__ #include diff --git a/paddle/fluid/platform/device_tracer.cc b/paddle/fluid/platform/device_tracer.cc index 8fa8dbd67c936439840cffa073b6fa6693dd31a1..dc1d751141187edb7738e42c41514614d4d399b0 100644 --- a/paddle/fluid/platform/device_tracer.cc +++ b/paddle/fluid/platform/device_tracer.cc @@ -189,6 +189,8 @@ void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t *buffer, } } // namespace +#endif // PADDLE_WITH_CUPTI + class DeviceTracerImpl : public DeviceTracer { public: DeviceTracerImpl() : enabled_(false) {} @@ -244,6 +246,8 @@ class DeviceTracerImpl : public DeviceTracer { if (enabled_) { return; } + +#ifdef PADDLE_WITH_CUPTI EnableActivity(); // Register callbacks for buffer requests and completed by CUPTI. @@ -262,6 +266,7 @@ class DeviceTracerImpl : public DeviceTracer { dynload::cuptiEnableCallback(1, subscriber_, CUPTI_CB_DOMAIN_DRIVER_API, CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel)); CUPTI_CALL(dynload::cuptiGetTimestamp(&start_ns_)); +#endif // PADDLE_WITH_CUPTI enabled_ = true; } @@ -313,16 +318,21 @@ class DeviceTracerImpl : public DeviceTracer { } void Disable() { +#ifdef PADDLE_WITH_CUPTI // flush might cause additional calls to DeviceTracker. dynload::cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED); +#endif // PADDLE_WITH_CUPTI std::lock_guard l(trace_mu_); +#ifdef PADDLE_WITH_CUPTI DisableActivity(); dynload::cuptiUnsubscribe(subscriber_); CUPTI_CALL(dynload::cuptiGetTimestamp(&end_ns_)); +#endif // PADDLE_WITH_CUPTI enabled_ = false; } private: +#ifdef PADDLE_WITH_CUPTI static void CUPTIAPI ApiCallback(void *userdata, CUpti_CallbackDomain domain, CUpti_CallbackId cbid, const void *cbdata) { auto *cbInfo = reinterpret_cast(cbdata); @@ -340,7 +350,8 @@ class DeviceTracerImpl : public DeviceTracer { VLOG(1) << "Unhandled API Callback for " << domain << " " << cbid; } } - + CUpti_SubscriberHandle subscriber_; +#endif // PADDLE_WITH_CUPTI std::mutex trace_mu_; bool enabled_; uint64_t start_ns_; @@ -349,45 +360,9 @@ class DeviceTracerImpl : public DeviceTracer { std::vector mem_records_; std::vector cpu_records_; std::unordered_map correlations_; - CUpti_SubscriberHandle subscriber_; -}; - -#endif // PADDLE_WITH_CUPTI - -class DeviceTracerDummy : public DeviceTracer { - public: - DeviceTracerDummy() {} - - void AddAnnotation(uint64_t id, const std::string &anno) {} - - void AddCPURecords(const std::string &anno, uint64_t start_ns, - uint64_t end_ns, int64_t device_id, int64_t thread_id) {} - - void AddMemRecords(const std::string &name, uint64_t start_ns, - uint64_t end_ns, int64_t device_id, int64_t stream_id, - uint32_t correlation_id, uint64_t bytes) {} - - void AddKernelRecords(uint64_t start, uint64_t end, int64_t device_id, - int64_t stream_id, uint32_t correlation_id) {} - - bool IsEnabled() { return false; } - - void Enable() {} - - proto::Profile GenProfile(const std::string &profile_path) { - return proto::Profile(); - } - - void Disable() {} }; -void CreateTracer(DeviceTracer **t) { -#ifdef PADDLE_WITH_CUPTI - *t = new DeviceTracerImpl(); -#else - *t = new DeviceTracerDummy(); -#endif // PADDLE_WITH_CUPTI -} +void CreateTracer(DeviceTracer **t) { *t = new DeviceTracerImpl(); } DeviceTracer *GetDeviceTracer() { std::call_once(tracer_once_flag, CreateTracer, &tracer); diff --git a/paddle/fluid/platform/device_tracer.h b/paddle/fluid/platform/device_tracer.h index d2a571f4345b544ad5e74f4629c3967593d6d628..322996fb4f54d34ebbb034a6e1de420e9c532545 100644 --- a/paddle/fluid/platform/device_tracer.h +++ b/paddle/fluid/platform/device_tracer.h @@ -13,6 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include +#include +#include // NOLINT #include #include "paddle/fluid/platform/dynload/cupti.h" @@ -25,6 +28,12 @@ namespace platform { // WARN: Under Development. Don't depend on it yet. ////////////////////// +inline uint64_t PosixInNsec() { + struct timeval tv; + gettimeofday(&tv, nullptr); + return 1000 * (static_cast(tv.tv_sec) * 1000000 + tv.tv_usec); +} + // DeviceTracer performs the following tasks: // 1. Register cuda callbacks for various events: kernel, memcpy, etc. // 2. Collect cuda statistics: start/end ts, memory, etc. diff --git a/paddle/fluid/platform/profiler.cc b/paddle/fluid/platform/profiler.cc index 7c8d8a5964fa5258bebaf2c8522886ae5886ab2c..d0286719b9ea1aa671294f519051ac1e269c4e93 100644 --- a/paddle/fluid/platform/profiler.cc +++ b/paddle/fluid/platform/profiler.cc @@ -15,7 +15,6 @@ limitations under the License. */ #include "paddle/fluid/platform/profiler.h" #include -#include #include #include #include @@ -97,12 +96,6 @@ inline uint64_t GetTimeInNsec() { .count(); } -inline uint64_t PosixInNsec() { - struct timeval tv; - gettimeofday(&tv, nullptr); - return 1000 * (static_cast(tv.tv_sec) * 1000000 + tv.tv_usec); -} - Event::Event(EventType type, std::string name, uint32_t thread_id, const DeviceContext* dev_ctx) : type_(type), name_(name), thread_id_(thread_id), has_cuda_(false) { diff --git a/python/paddle/dataset/wmt14.py b/python/paddle/dataset/wmt14.py index aaae4b3588ae79cad48804ebe317ca1e6eecc3b8..ecd39a79f1367d3ffde36bc500978aaa3b3d6946 100644 --- a/python/paddle/dataset/wmt14.py +++ b/python/paddle/dataset/wmt14.py @@ -38,8 +38,7 @@ URL_DEV_TEST = ('http://www-lium.univ-lemans.fr/~schwenk/' MD5_DEV_TEST = '7d7897317ddd8ba0ae5c5fa7248d3ff5' # this is a small set of data for test. The original data is too large and # will be add later. -URL_TRAIN = ('http://paddlepaddle.cdn.bcebos.com/demo/' - 'wmt_shrinked_data/wmt14.tgz') +URL_TRAIN = ('http://paddlemodels.bj.bcebos.com/wmt/wmt14.tgz') MD5_TRAIN = '0791583d57d5beb693b9414c5b36798c' # BLEU of this trained model is 26.92 URL_MODEL = 'http://paddlemodels.bj.bcebos.com/wmt%2Fwmt14.tgz' diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 2ab176ec272a9f250d5229ae67faf350e5eda72c..cccb4abe6cf14ac3e90ba59d970c9398b09b7db2 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -128,7 +128,8 @@ def __bootstrap__(): ] if core.is_compiled_with_dist(): read_env_flags.append('rpc_deadline') - read_env_flags.append('listen_and_serv_profile_period') + read_env_flags.append('rpc_server_profile_period') + read_env_flags.append('rpc_server_profile_path') if core.is_compiled_with_cuda(): read_env_flags += [ diff --git a/python/paddle/fluid/layers/detection.py b/python/paddle/fluid/layers/detection.py index 0de66d11447581e68b2559fdb8d91c7f0d464dde..4f317a40309a4267330a3e89727bc22088fec6e5 100644 --- a/python/paddle/fluid/layers/detection.py +++ b/python/paddle/fluid/layers/detection.py @@ -166,7 +166,7 @@ def rpn_target_assign(loc, }) # 4. Reshape and gather the target entry - scores = nn.reshape(x=scores, shape=(-1, 1)) + scores = nn.reshape(x=scores, shape=(-1, 2)) loc = nn.reshape(x=loc, shape=(-1, 4)) target_label = nn.reshape(x=target_label, shape=(-1, 1)) target_bbox = nn.reshape(x=target_bbox, shape=(-1, 4)) @@ -724,7 +724,7 @@ def ssd_loss(location, }, attrs={ 'neg_pos_ratio': neg_pos_ratio, - 'neg_dist_threshold': neg_pos_ratio, + 'neg_dist_threshold': neg_overlap, 'mining_type': mining_type, 'sample_size': sample_size, }) diff --git a/python/paddle/fluid/profiler.py b/python/paddle/fluid/profiler.py index 5fbb35abddb25e92ef53fe22af372b497f5a63f0..5bbbdf7fe719b027a89ec980c42e736d25f6c264 100644 --- a/python/paddle/fluid/profiler.py +++ b/python/paddle/fluid/profiler.py @@ -219,7 +219,7 @@ def stop_profiler(sorted_key=None, profile_path='/tmp/profile'): def profiler(state, sorted_key=None, profile_path='/tmp/profile'): """The profiler interface. Different from cuda_profiler, this profiler can be used to profile both CPU - and GPU program. By defalut, it records the CPU and GPU operator kernels, + and GPU program. By default, it records the CPU and GPU operator kernels, if you want to profile other program, you can refer the profiling tutorial to add more records in C++ code. @@ -232,7 +232,7 @@ def profiler(state, sorted_key=None, profile_path='/tmp/profile'): state (string) : The profiling state, which should be 'CPU' or 'GPU', telling the profiler to use CPU timer or GPU timer for profiling. Although users may have already specified the execution place - (CPUPlace/CUDAPlace) in the begining, for flexibility the profiler + (CPUPlace/CUDAPlace) in the beginning, for flexibility the profiler would not inherit this place. sorted_key (string) : If None, the profiling results will be printed in the order of first end time of events. Otherwise, the profiling diff --git a/python/paddle/fluid/tests/unittests/dist_mnist.py b/python/paddle/fluid/tests/unittests/dist_mnist.py new file mode 100644 index 0000000000000000000000000000000000000000..8f5ba33f7cbf5286edc4503c219fd3cdff60c517 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_mnist.py @@ -0,0 +1,103 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import time +import math + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from functools import reduce +from test_dist_base import TestDistRunnerBase, runtime_main + +DTYPE = "float32" +paddle.dataset.mnist.fetch() + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale, seed=1))) + return predict + + +class TestDistMnist2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + # Input data + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + predict = cnn_model(images) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + # Optimization + opt = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, beta1=0.9, beta2=0.999) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.train(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + opt.minimize(avg_cost) + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +if __name__ == "__main__": + runtime_main(TestDistMnist2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_se_resnext.py b/python/paddle/fluid/tests/unittests/dist_se_resnext.py index e53e1e6dc12fc40684e9827f8dddecc0d834a064..f9ac6612dffb3a9678328277008649a074dd6887 100644 --- a/python/paddle/fluid/tests/unittests/dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/dist_se_resnext.py @@ -27,6 +27,7 @@ from multiprocessing import Process import os import sys import signal +from test_dist_base import TestDistRunnerBase, runtime_main # Fix seed for test fluid.default_startup_program().random_seed = 1 @@ -196,161 +197,52 @@ class SE_ResNeXt(): return scale -def get_model(batch_size): - # Input data - image = fluid.layers.data(name="data", shape=[3, 224, 224], dtype='float32') - label = fluid.layers.data(name="int64", shape=[1], dtype='int64') +class DistSeResneXt2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + # Input data + image = fluid.layers.data( + name="data", shape=[3, 224, 224], dtype='float32') + label = fluid.layers.data(name="int64", shape=[1], dtype='int64') - # Train program - model = SE_ResNeXt(layers=50) - out = model.net(input=image, class_dim=102) - cost = fluid.layers.cross_entropy(input=out, label=label) + # Train program + model = SE_ResNeXt(layers=50) + out = model.net(input=image, class_dim=102) + cost = fluid.layers.cross_entropy(input=out, label=label) - avg_cost = fluid.layers.mean(x=cost) - acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1) - acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5) + avg_cost = fluid.layers.mean(x=cost) + acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1) + acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5) - # Evaluator - test_program = fluid.default_main_program().clone(for_test=True) + # Evaluator + test_program = fluid.default_main_program().clone(for_test=True) - # Optimization - total_images = 6149 # flowers - epochs = [30, 60, 90] - step = int(total_images / batch_size + 1) + # Optimization + total_images = 6149 # flowers + epochs = [30, 60, 90] + step = int(total_images / batch_size + 1) - bd = [step * e for e in epochs] - base_lr = 0.1 - lr = [] - lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + bd = [step * e for e in epochs] + base_lr = 0.1 + lr = [] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] - optimizer = fluid.optimizer.Momentum( - # FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed. - #learning_rate=fluid.layers.piecewise_decay( - # boundaries=bd, values=lr), - learning_rate=base_lr, - momentum=0.9, - regularization=fluid.regularizer.L2Decay(1e-4)) - optimizer.minimize(avg_cost) + optimizer = fluid.optimizer.Momentum( + # FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed. + #learning_rate=fluid.layers.piecewise_decay( + # boundaries=bd, values=lr), + learning_rate=base_lr, + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + optimizer.minimize(avg_cost) - # Reader - train_reader = paddle.batch( - paddle.dataset.flowers.train(), batch_size=batch_size) - test_reader = paddle.batch( - paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size) + # Reader + train_reader = paddle.batch( + paddle.dataset.flowers.train(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size) - return test_program, avg_cost, train_reader, test_reader, acc_top1, out - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -class DistSeResneXt2x2: - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): - get_model(batch_size=2) - t = get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - exe.run(pserver_prog) - - def _wait_ps_ready(self, pid): - retry_times = 20 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(3) - print("waiting ps ready: ", pid) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): - test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model( - batch_size=2) - if is_dist: - t = get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers) - trainer_prog = t.get_trainer_program() - else: - trainer_prog = fluid.default_main_program() - - startup_exe = fluid.Executor(place) - startup_exe.run(fluid.default_startup_program()) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - strategy.allow_op_delay = False - exe = fluid.ParallelExecutor( - True, loss_name=avg_cost.name, exec_strategy=strategy) - - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] - - feeder = fluid.DataFeeder(feed_var_list, place) - reader_generator = test_reader() - - data = next(reader_generator) - first_loss, = exe.run(fetch_list=[avg_cost.name], - feed=feeder.feed(data)) - print(first_loss) - - for i in six.moves.xrange(5): - data = next(reader_generator) - loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) - - data = next(reader_generator) - last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) - print(last_loss) - - -def main(role="pserver", - endpoints="127.0.0.1:9123", - trainer_id=0, - current_endpoint="127.0.0.1:9123", - trainers=1, - is_dist=True): - model = DistSeResneXt2x2() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) - else: - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + return test_program, avg_cost, train_reader, test_reader, acc_top1, out if __name__ == "__main__": - if len(sys.argv) != 7: - print( - "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" - ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False - main( - role=role, - endpoints=endpoints, - trainer_id=trainer_id, - current_endpoint=current_endpoint, - trainers=trainers, - is_dist=is_dist) + runtime_main(DistSeResneXt2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_word2vec.py b/python/paddle/fluid/tests/unittests/dist_word2vec.py new file mode 100644 index 0000000000000000000000000000000000000000..54a70f4adb4a9bb24e3c618a7fe71f42a376609b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_word2vec.py @@ -0,0 +1,119 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import time +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from test_dist_base import TestDistRunnerBase, runtime_main + +IS_SPARSE = True +EMBED_SIZE = 32 +HIDDEN_SIZE = 256 +N = 5 + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +class TestDistWord2vec2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + BATCH_SIZE = batch_size + + def __network__(words): + embed_first = fluid.layers.embedding( + input=words[0], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_second = fluid.layers.embedding( + input=words[1], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_third = fluid.layers.embedding( + input=words[2], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + embed_forth = fluid.layers.embedding( + input=words[3], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr=fluid.ParamAttr( + name='shared_w', initializer=fluid.initializer.Constant())) + + concat_embed = fluid.layers.concat( + input=[embed_first, embed_second, embed_third, embed_forth], + axis=1) + hidden1 = fluid.layers.fc( + input=concat_embed, + size=HIDDEN_SIZE, + act='sigmoid', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant())) + predict_word = fluid.layers.fc( + input=hidden1, + size=dict_size, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant())) + cost = fluid.layers.cross_entropy( + input=predict_word, label=words[4]) + avg_cost = fluid.layers.mean(cost) + return avg_cost, predict_word + + word_dict = paddle.dataset.imikolov.build_dict() + dict_size = len(word_dict) + + first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') + second_word = fluid.layers.data( + name='secondw', shape=[1], dtype='int64') + third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') + forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') + next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') + avg_cost, predict_word = __network__( + [first_word, second_word, third_word, forth_word, next_word]) + + inference_program = paddle.fluid.default_main_program().clone() + + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + train_reader = paddle.batch( + paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) + test_reader = paddle.batch( + paddle.dataset.imikolov.test(word_dict, N), BATCH_SIZE) + + return inference_program, avg_cost, train_reader, test_reader, None, predict_word + + +if __name__ == "__main__": + runtime_main(TestDistWord2vec2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 39b304c714428173b3c15cf8a0109330b612a45c..618c910b71b2cc6f22e55f7ba1772e71de42e71e 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -19,6 +19,109 @@ import sys import six import signal import subprocess +import six + + +class TestDistRunnerBase(object): + def get_model(self, batch_size=2): + raise NotImplementedError( + "get_model should be implemented by child classes.") + + def get_transpiler(self, trainer_id, main_program, pserver_endpoints, + trainers): + # NOTE: import fluid until runtime, or else forking processes will cause error. + import paddle + import paddle.fluid as fluid + t = fluid.DistributeTranspiler() + t.transpile( + trainer_id=trainer_id, + program=main_program, + pservers=pserver_endpoints, + trainers=trainers) + return t + + def run_pserver(self, pserver_endpoints, trainers, current_endpoint, + trainer_id): + import paddle + import paddle.fluid as fluid + self.get_model(batch_size=2) + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), pserver_endpoints, + trainers) + pserver_prog = t.get_pserver_program(current_endpoint) + startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + exe.run(pserver_prog) + + def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): + import paddle + import paddle.fluid as fluid + test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ + self.get_model(batch_size=2) + if is_dist: + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), endpoints, + trainers) + trainer_prog = t.get_trainer_program() + else: + trainer_prog = fluid.default_main_program() + + startup_exe = fluid.Executor(place) + startup_exe.run(fluid.default_startup_program()) + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + strategy.allow_op_delay = False + exe = fluid.ParallelExecutor( + True, loss_name=avg_cost.name, exec_strategy=strategy) + + feed_var_list = [ + var for var in trainer_prog.global_block().vars.values() + if var.is_data + ] + + feeder = fluid.DataFeeder(feed_var_list, place) + reader_generator = test_reader() + + data = next(reader_generator) + first_loss, = exe.run(fetch_list=[avg_cost.name], + feed=feeder.feed(data)) + print(first_loss) + + for i in six.moves.xrange(5): + data = next(reader_generator) + loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) + + data = next(reader_generator) + last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data)) + print(last_loss) + + +def runtime_main(test_class): + import paddle + import paddle.fluid as fluid + import paddle.fluid.core as core + + if len(sys.argv) != 7: + print( + "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" + ) + role = sys.argv[1] + endpoints = sys.argv[2] + trainer_id = int(sys.argv[3]) + current_endpoint = sys.argv[4] + trainers = int(sys.argv[5]) + is_dist = True if sys.argv[6] == "TRUE" else False + + model = test_class() + if role == "pserver": + model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) + else: + p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( + ) else fluid.CPUPlace() + model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) import paddle.fluid.compat as cpt @@ -130,12 +233,10 @@ class TestDistBase(unittest.TestCase): local_first_loss = eval(local_lines[0])[0] local_last_loss = eval(local_lines[1])[0] - self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) - self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta) - - # check tr0_out - # FIXME: ensure the server process is killed - # replace with ps0.terminate() + # FIXME: use terminate() instead of sigkill. os.kill(ps0.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL) FNULL.close() + + self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) + self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index a6fcbd977f1af9d452dfd8367efef706cd566149..b3ccec9a7d65de57778a1f013465d41a5a267676 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -11,200 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import numpy as np -import argparse -import time -import math - -import paddle -import paddle.fluid as fluid -import paddle.fluid.profiler as profiler -from paddle.fluid import core import unittest -from multiprocessing import Process -import os -import signal -from functools import reduce - -SEED = 1 -DTYPE = "float32" -paddle.dataset.mnist.fetch() - - -# random seed must set before configuring the network. -# fluid.default_startup_program().random_seed = SEED -def cnn_model(data): - conv_pool_1 = fluid.nets.simple_img_conv_pool( - input=data, - filter_size=5, - num_filters=20, - pool_size=2, - pool_stride=2, - act="relu") - conv_pool_2 = fluid.nets.simple_img_conv_pool( - input=conv_pool_1, - filter_size=5, - num_filters=50, - pool_size=2, - pool_stride=2, - act="relu") - - # TODO(dzhwinter) : refine the initializer and random seed settting - SIZE = 10 - input_shape = conv_pool_2.shape - param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] - scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 - - predict = fluid.layers.fc( - input=conv_pool_2, - size=SIZE, - act="softmax", - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.NormalInitializer( - loc=0.0, scale=scale))) - return predict - - -def get_model(batch_size): - # Input data - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - # Train program - predict = cnn_model(images) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - - # Evaluator - batch_size_tensor = fluid.layers.create_tensor(dtype='int64') - batch_acc = fluid.layers.accuracy( - input=predict, label=label, total=batch_size_tensor) - - inference_program = fluid.default_main_program().clone() - # Optimization - opt = fluid.optimizer.AdamOptimizer( - learning_rate=0.001, beta1=0.9, beta2=0.999) - - # Reader - train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=batch_size) - test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=batch_size) - opt.minimize(avg_cost) - return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -def run_pserver(pserver_endpoints, trainers, current_endpoint): - get_model(batch_size=20) - t = get_transpiler(0, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - - exe.run(pserver_prog) - - -class TestDistMnist(unittest.TestCase): - def setUp(self): - self._trainers = 1 - self._pservers = 1 - self._ps_endpoints = "127.0.0.1:9123" - - def start_pserver(self, endpoint): - p = Process( - target=run_pserver, - args=(self._ps_endpoints, self._trainers, endpoint)) - p.start() - return p.pid - - def _wait_ps_ready(self, pid): - retry_times = 5 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(1) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def stop_pserver(self, pid): - os.kill(pid, signal.SIGTERM) - - def test_with_place(self): - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - - pserver_pid = self.start_pserver(self._ps_endpoints) - self._wait_ps_ready(pserver_pid) - - self.run_trainer(p, 0) - - self.stop_pserver(pserver_pid) - - def run_trainer(self, place, trainer_id): - test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model( - batch_size=20) - t = get_transpiler(trainer_id, - fluid.default_main_program(), self._ps_endpoints, - self._trainers) - - trainer_prog = t.get_trainer_program() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] +from test_dist_base import TestDistBase - feeder = fluid.DataFeeder(feed_var_list, place) - for pass_id in range(10): - for batch_id, data in enumerate(train_reader()): - exe.run(trainer_prog, feed=feeder.feed(data)) - if (batch_id + 1) % 10 == 0: - acc_set = [] - avg_loss_set = [] - for test_data in test_reader(): - acc_np, avg_loss_np = exe.run( - program=test_program, - feed=feeder.feed(test_data), - fetch_list=[batch_acc, avg_cost]) - acc_set.append(float(acc_np)) - avg_loss_set.append(float(avg_loss_np)) - # get test acc and loss - acc_val = np.array(acc_set).mean() - avg_loss_val = np.array(avg_loss_set).mean() - if float(acc_val - ) > 0.8: # Smaller value to increase CI speed - return - else: - print( - 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. - format(pass_id, batch_id + 1, - float(avg_loss_val), float(acc_val))) - if math.isnan(float(avg_loss_val)): - assert ("got Nan loss, training failed.") +class TestDistSeResneXt2x2(TestDistBase): + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index f3a5fd6985bab1d04f6e1484534367548f383dfb..a33a338fc11e4301a8ec0eb565686d62b547b7f7 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -17,7 +17,7 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): def test_se_resnext(self): - self.check_with_place("dist_se_resnext.py") + self.check_with_place("dist_se_resnext.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py index 4bb3998f891959f8270dc4b25821f23f1e6195e0..543d0f9dc2c9b8cdcfaaaa14a7a4f197d210d951 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py @@ -11,192 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import numpy as np -import argparse -import time -import math -import paddle -import paddle.fluid as fluid -import paddle.fluid.profiler as profiler -from paddle.fluid import core import unittest -from multiprocessing import Process -import os -import signal - -IS_SPARSE = True -EMBED_SIZE = 32 -HIDDEN_SIZE = 256 -N = 5 -BATCH_SIZE = 32 -ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy - - -def get_model(): - def __network__(words): - embed_first = fluid.layers.embedding( - input=words[0], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_second = fluid.layers.embedding( - input=words[1], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_third = fluid.layers.embedding( - input=words[2], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - embed_forth = fluid.layers.embedding( - input=words[3], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - - concat_embed = fluid.layers.concat( - input=[embed_first, embed_second, embed_third, embed_forth], axis=1) - hidden1 = fluid.layers.fc(input=concat_embed, - size=HIDDEN_SIZE, - act='sigmoid') - predict_word = fluid.layers.fc(input=hidden1, - size=dict_size, - act='softmax') - cost = fluid.layers.cross_entropy(input=predict_word, label=words[4]) - avg_cost = fluid.layers.mean(cost) - return avg_cost, predict_word - - word_dict = paddle.dataset.imikolov.build_dict() - dict_size = len(word_dict) - - first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') - second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') - third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') - forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') - next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - avg_cost, predict_word = __network__( - [first_word, second_word, third_word, forth_word, next_word]) - - inference_program = paddle.fluid.default_main_program().clone() - - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) - - train_reader = paddle.batch( - paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) - test_reader = paddle.batch( - paddle.dataset.imikolov.test(word_dict, N), BATCH_SIZE) - - return inference_program, avg_cost, train_reader, test_reader, predict_word - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -def run_pserver(pserver_endpoints, trainers, current_endpoint): - get_model() - t = get_transpiler(0, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - - exe.run(pserver_prog) - - -class TestDistMnist(unittest.TestCase): - def setUp(self): - self._trainers = 1 - self._pservers = 1 - self._ps_endpoints = "127.0.0.1:9123" - - def start_pserver(self, endpoint): - p = Process( - target=run_pserver, - args=(self._ps_endpoints, self._trainers, endpoint)) - p.start() - return p.pid - - def _wait_ps_ready(self, pid): - retry_times = 5 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(1) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def stop_pserver(self, pid): - os.kill(pid, signal.SIGKILL) - - def test_with_place(self): - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - - pserver_pid = self.start_pserver(self._ps_endpoints) - self._wait_ps_ready(pserver_pid) - - self.run_trainer(p, 0) - - self.stop_pserver(pserver_pid) - - def run_trainer(self, place, trainer_id): - test_program, avg_cost, train_reader, test_reader, predict = get_model() - t = get_transpiler(trainer_id, - fluid.default_main_program(), self._ps_endpoints, - self._trainers) - - trainer_prog = t.get_trainer_program() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - use_gpu = True if core.is_compiled_with_cuda() else False - - exec_strategy = ExecutionStrategy() - exec_strategy.use_cuda = use_gpu - train_exe = fluid.ParallelExecutor( - use_cuda=use_gpu, - main_program=trainer_prog, - loss_name=avg_cost.name, - exec_strategy=exec_strategy) +from test_dist_base import TestDistBase - feed_var_list = [ - var for var in trainer_prog.global_block().vars.values() - if var.is_data - ] - feeder = fluid.DataFeeder(feed_var_list, place) - for pass_id in range(10): - for batch_id, data in enumerate(train_reader()): - avg_loss_np = train_exe.run(feed=feeder.feed(data), - fetch_list=[avg_cost.name]) - loss = np.array(avg_loss_np).mean() - if float(loss) < 5.0: - return - if math.isnan(loss): - assert ("Got Nan loss, training failed") +class TestDistSeResneXt2x2(TestDistBase): + def test_se_resnext(self): + self.check_with_place("dist_word2vec.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/v2/dataset/wmt14.py b/python/paddle/v2/dataset/wmt14.py index 1ec210f265049c8b62cd99cd218f25a9f846ef43..b9e602f324ad9bf43416b420c6d5697050a5c802 100644 --- a/python/paddle/v2/dataset/wmt14.py +++ b/python/paddle/v2/dataset/wmt14.py @@ -15,7 +15,7 @@ WMT14 dataset. The original WMT14 dataset is too large and a small set of data for set is provided. This module will download dataset from -http://paddlepaddle.cdn.bcebos.com/demo/wmt_shrinked_data/wmt14.tgz and +http://paddlemodels.bj.bcebos.com/wmt/wmt14.tgz and parse training set and test set into paddle reader creators. """ @@ -37,8 +37,7 @@ URL_DEV_TEST = ('http://www-lium.univ-lemans.fr/~schwenk/' MD5_DEV_TEST = '7d7897317ddd8ba0ae5c5fa7248d3ff5' # this is a small set of data for test. The original data is too large and # will be add later. -URL_TRAIN = ('http://paddlepaddle.cdn.bcebos.com/demo/' - 'wmt_shrinked_data/wmt14.tgz') +URL_TRAIN = ('http://paddlemodels.bj.bcebos.com/wmt/wmt14.tgz') MD5_TRAIN = '0791583d57d5beb693b9414c5b36798c' # BLEU of this trained model is 26.92 URL_MODEL = 'http://paddlemodels.bj.bcebos.com/wmt%2Fwmt14.tgz' diff --git a/tools/diff_api.py b/tools/diff_api.py index cf9f2c72cb78ddf88ff2a7bb1c0ee4b00ec0ec96..97c739ed2a5627ad9fd326f206976a4579dc26a3 100644 --- a/tools/diff_api.py +++ b/tools/diff_api.py @@ -20,9 +20,7 @@ for each_diff in result: if each_diff[0] in ['-', '?']: # delete or change API is not allowed error = True elif each_diff[0] == '+': - # only new layers is allowed. - if not each_diff.startswith('+ paddle.fluid.layers.'): - error = True + error = True if each_diff[0] != ' ': print(each_diff) diff --git a/tools/manylinux1/Dockerfile.x64 b/tools/manylinux1/Dockerfile.x64 index 0b72ea323b72a1a6cfd0911416c4037243d06ff4..0d59e4c110ff8502acb4dbcda15f855f7652a946 100644 --- a/tools/manylinux1/Dockerfile.x64 +++ b/tools/manylinux1/Dockerfile.x64 @@ -40,11 +40,13 @@ RUN wget -O /root/requirements.txt https://raw.githubusercontent.com/PaddlePaddl RUN LD_LIBRARY_PATH=/opt/_internal/cpython-2.7.11-ucs4/lib:${LD_LIBRARY_PATH} /opt/python/cp27-cp27mu/bin/pip install -r /root/requirements.txt && \ LD_LIBRARY_PATH=/opt/_internal/cpython-2.7.11-ucs2/lib:${LD_LIBRARY_PATH} /opt/python/cp27-cp27m/bin/pip install -r /root/requirements.txt && \ + LD_LIBRARY_PATH=/opt/_internal/cpython-3.5.1/lib/:${LD_LIBRARY_PATH} /opt/_internal/cpython-3.5.1/bin/pip3 install -r /root/requirements.txt && \ go get github.com/Masterminds/glide && \ rm -rf /root/requirements.txt RUN LD_LIBRARY_PATH=/opt/_internal/cpython-2.7.11-ucs4/lib:${LD_LIBRARY_PATH} /opt/python/cp27-cp27mu/bin/pip install pre-commit 'ipython==5.3.0' opencv-python && \ - LD_LIBRARY_PATH=/opt/_internal/cpython-2.7.11-ucs2/lib:${LD_LIBRARY_PATH} /opt/python/cp27-cp27m/bin/pip install pre-commit 'ipython==5.3.0' opencv-python + LD_LIBRARY_PATH=/opt/_internal/cpython-2.7.11-ucs2/lib:${LD_LIBRARY_PATH} /opt/python/cp27-cp27m/bin/pip install pre-commit 'ipython==5.3.0' opencv-python && \ + LD_LIBRARY_PATH=/opt/_internal/cpython-3.5.1/lib/:${LD_LIBRARY_PATH} /opt/_internal/cpython-3.5.1/bin/pip3 install pre-commit 'ipython==5.3.0' opencv-python RUN wget -O /opt/swig-2.0.12.tar.gz https://cytranet.dl.sourceforge.net/project/swig/swig/swig-2.0.12/swig-2.0.12.tar.gz && \ cd /opt && tar xzf swig-2.0.12.tar.gz && cd /opt/swig-2.0.12 && ./configure && make && make install && cd /opt && rm swig-2.0.12.tar.gz